动态块大小的Spring Batch自定义完成策略 [英] Spring Batch custom completion policy for dynamic chunk size

查看:439
本文介绍了动态块大小的Spring Batch自定义完成策略的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

上下文

我们有一个批处理作业,可以将本地化的国家/地区名称(即国家/地区名称的翻译)复制到我们的数据库中外部的。我们的想法是在1个块中处理单个国家的所有本地化国家名称(即第一个块 - 安道尔的所有翻译,下一个块 - U.A.E.的所有翻译等)。我们使用 JdbcCursorItemReader 来读取外部数据+一些oracle分析函数,以提供该国家可用的翻译总数:类似于

We have a batch job that replicates localized country names (i.e. translations of country names to different languages) to our DB from the external one. The idea was to process all localized country names for a single country in 1 chunk (i.e. first chunk - all translations for Andorra, next chunk - all translations for U.A.E., etc.). We use JdbcCursorItemReader for reading external data + some oracle analytic functions to provide total number of translations available for the country: something like

select country_code, language_code, localized_name, COUNT(1) OVER(PARTITION BY c_lng.country_code) as lng_count
from EXT_COUNTRY_LNG c_lng
order by c_lng.countty_code, c_lng.language_code

问题

因此,通过块切割此输入看起来很简单:当您读取 lng_count 中指定的确切行数时停止块,然后使用下一个开始新的行读取行,但实际上看起来并不那么简单:(

So cutting this input by chunks looks simple: stop chunk when you've read the exact amount of rows specified in lng_count and start a new one with the next read row, but it appears not to be so simple practically :(

首先要尝试的是自定义完成策略。但问题是,它没有访问权限到最后一项,由 ItemReader 读取 - 你应该明确地将它放在读者的上下文中并将其恢复到政策中。不喜欢它因为它需要addit读者修改/添加读者听众。此外,我不喜欢来回序列化/反序列化的相同项目。我不觉得 JobContext / StepContext 是这类数据的好地方。

First thing to try is a custom completion policy. But the problem is, it doesn't have access to the last item, read by ItemReader - you should explicitly put it to context in reader and get it back in policy. Don't like it 'cause it requires additional reader modifications/adding reader listeners. Moreover I don't like the same item being serialized/deserialized back and forth. And I don't feel like JobContext/StepContext is a good place for such data.

还有 RepeatContext 这对于这样的数据来说看起来更好,但我无法轻易找到它 ...

There's also RepeatContext which looks like a better place for such data, but I was not able to get to it easily...

最后我们最终得到这样的解决方案:

So finally we end up with solution like this:

@Bean(name = "localizedCountryNamesStep")
@JobScope
public Step insertCountryStep(
        final StepBuilderFactory stepBuilderFactory,
        final MasterdataCountryNameReader countryNameReader,
        final MasterdataCountryNameProcessor countryNameProcessor,
        final MasterdataCountryNameWriter writer) {
    /* Use the same fixed-commit policy, but update it's chunk size dynamically */
    final SimpleCompletionPolicy policy = new SimpleCompletionPolicy();
    return stepBuilderFactory.get("localizedCountryNamesStep")
            .<ExtCountryLng, LocalizedCountryName> chunk(policy)
            .reader(countryNameReader)
            .listener(new ItemReadListener<ExtCountryLng>() {

                @Override
                public void beforeRead() {
                    // do nothing
                }

                @Override
                public void afterRead(final ExtCountryLng item) {
                    /* Update the cunk size after every read: consequent reads 
                    inside the same country = same chunk do nothing since lngCount is always the same there */
                    policy.setChunkSize(item.getLngCount());
                }

                @Override
                public void onReadError(final Exception ex) {
                    // do nothing
                }
            })
            .processor(countryNameProcessor)
            .writer(writer)
            .faultTolerant()
            .skip(RuntimeException.class)
            .skipLimit(Integer.MAX_VALUE) // Batch does not support unlimited skip
            .retryLimit(0) // this solution disables only retry, but not recover
            .build();
}

它有效,它需要最少的代码更改,但它仍然有点难看我。所以我想知道,当所有必需的信息都已经在 ItemReader

It's working, it requires minimum code changes, but it's still a bit ugly for me. So I'm wondering, is there another elegant way to do a dynamic chunk size in Spring Batch when all the required information is already available at the ItemReader?

推荐答案

最简单的方法是简单地按国家/地区分区。这样每个国家都会有自己的步骤,你也可以跨越国家以提高性能。

The easiest way would be to simply partition your step by country. That way each country would get its own step, and you would also be able to thread across countries for increased performance.

如果它需要是一个读者,你可以包装委托 PeekableItemReader 并扩展 SimpleCompletionPolicy 以实现目标。

If it needs to be a single reader, you can wrap a delegate PeekableItemReader and extend SimpleCompletionPolicy to accomplish your goal.

public class CountryPeekingCompletionPolicyReader extends SimpleCompletionPolicy implements ItemReader<CountrySpecificItem> {

    private PeekableItemReader<? extends CountrySpecificItem> delegate;

    private CountrySpecificItem currentReadItem = null;

    @Override
    public CountrySpecificItem read() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
        currentReadItem = delegate.read();
        return currentReadItem;
    }

    @Override
    public RepeatContext start(final RepeatContext context) {
        return new ComparisonPolicyTerminationContext(context);
    }

    protected class ComparisonPolicyTerminationContext extends SimpleTerminationContext {

        public ComparisonPolicyTerminationContext(final RepeatContext context) {
            super(context);
        }

        @Override
        public boolean isComplete() {
            final CountrySpecificItem nextReadItem = delegate.peek();

            // logic to check if same country
            if (currentReadItem.isSameCountry(nextReadItem)) {
                return false;
            }

            return true;
        }
    }
}

然后在你的上下文中你会define:

Then in your context you would define:

<batch:tasklet>
    <batch:chunk chunk-completion-policy="countrySpecificCompletionPolicy" reader="countrySpecificCompletionPolicy" writer="someWriter" />
</batch:tasklet>

<bean id="countrySpecificCompletionPolicy" class="CountryPeekingCompletionPolicyReader">
     <property name="delegate" ref="peekableReader" />
</bean>


<bean id="peekableReader" class="YourPeekableItemReader" />






编辑思考回到你的问题,分区打击我作为最干净的方法。使用分区步骤,每个ItemReader(确保 scope =step)将从步骤执行上下文传递一个 countryName 。是的,您需要一个自定义 Partitioner 类来构建执行上下文映射(每个国家/地区一个条目)和一个足够大的硬编码提交间隔以容纳您最大的工作单元,但之后一切都是样板,并且因为每个从属步骤只是一个块,重启应该是任何可能遇到问题的国家的相对轻微。


Thinking back over your issue, partitioning strikes me as the cleanest approach. Using a partitioned step, each ItemReader (make sure scope="step") will be passed a single countryName from the step execution context. Yes, you'll need a custom Partitioner class to build up your map of execution contexts (one entry per country) and a hard-coded commit interval large enough to accommodate your largest unit of work, but after that everything is very boilerplate, and since each slave step will only be a single chunk, restart should be a relative breeze for any countries that might hit issues.

这篇关于动态块大小的Spring Batch自定义完成策略的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆