如何使用FlatFileItemReader和异步处理器优化性能 [英] How to optimize my performances using FlatFileItemReader and Asynchronous Processors

查看:248
本文介绍了如何使用FlatFileItemReader和异步处理器优化性能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个约40万行的简单csv文件(仅一列) 我花了很多时间来读取记录并进行处理

I have a simple csv file with ~400,000 line(one column only) It takes me alot of time to read the records and process them

处理器根据沙发床验证记录

the processor validating records against couchbase

作者-写入远程主题 带我大约30分钟.那太疯狂了.

the writer - writing into remote topic Takes me around 30 mins. thats insane.

我读到flatfileItemreader不是线程安全的.所以我的块值是1.

I read that flatfileItemreader is not thread safe. so my chunk value is 1.

我读到异步处理可以提供帮助.但我看不到任何改善.

I read the Asynchronous processing could assist. but I cant see any improvements.

那是我的代码:

@Configuration
@EnableBatchProcessing
public class NotificationFileProcessUploadedFileJob {


    @Value("${expected.snid.header}")
    public String snidHeader;

    @Value("${num.of.processing.chunks.per.file}")
    public int numOfProcessingChunksPerFile;

    @Autowired
    private InfrastructureConfigurationConfig infrastructureConfigurationConfig;

    private static final String OVERRIDDEN_BY_EXPRESSION = null;


    @Inject
    private JobBuilderFactory jobs;

    @Inject
    private StepBuilderFactory stepBuilderFactory;

    @Inject
    ExecutionContextPromotionListener executionContextPromotionListener;


    @Bean
    public Job processUploadedFileJob() throws Exception {
        return this.jobs.get("processUploadedFileJob").start((processSnidUploadedFileStep())).build();

    }

    @Bean
    public Step processSnidUploadedFileStep() {
        return stepBuilderFactory.get("processSnidFileStep")
                .<PushItemDTO, PushItemDTO>chunk(numOfProcessingChunksPerFile)
                .reader(snidFileReader(OVERRIDDEN_BY_EXPRESSION))
                .processor(asyncItemProcessor())
                .writer(asyncItemWriter())
            //    .throttleLimit(20)
             //   .taskJobExecutor(infrastructureConfigurationConfig.taskJobExecutor())


                        //     .faultTolerant()
                        //   .skipLimit(10) //default is set to 0
                        //     .skip(MySQLIntegrityConstraintViolationException.class)
                .build();
    }

    @Inject
    ItemWriter writer;

    @Bean
    public AsyncItemWriter asyncItemWriter() {
        AsyncItemWriter asyncItemWriter=new AsyncItemWriter();
        asyncItemWriter.setDelegate(writer);
        return asyncItemWriter;
    }


    @Bean
    @Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
    public ItemStreamReader<PushItemDTO> snidFileReader(@Value("#{jobParameters[filePath]}") String filePath) {
        FlatFileItemReader<PushItemDTO> itemReader = new FlatFileItemReader<PushItemDTO>();
        itemReader.setLineMapper(snidLineMapper());
        itemReader.setLinesToSkip(1);
        itemReader.setResource(new FileSystemResource(filePath));
        return itemReader;
    }


    @Bean
    public AsyncItemProcessor asyncItemProcessor() {

        AsyncItemProcessor<PushItemDTO, PushItemDTO> asyncItemProcessor = new AsyncItemProcessor();

        asyncItemProcessor.setDelegate(processor(OVERRIDDEN_BY_EXPRESSION, OVERRIDDEN_BY_EXPRESSION, OVERRIDDEN_BY_EXPRESSION,
                OVERRIDDEN_BY_EXPRESSION, OVERRIDDEN_BY_EXPRESSION, OVERRIDDEN_BY_EXPRESSION, OVERRIDDEN_BY_EXPRESSION));
        asyncItemProcessor.setTaskExecutor(infrastructureConfigurationConfig.taskProcessingExecutor());

        return asyncItemProcessor;

    }

    @Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
    @Bean
    public ItemProcessor<PushItemDTO, PushItemDTO> processor(@Value("#{jobParameters[pushMessage]}") String pushMessage,
                                                             @Value("#{jobParameters[jobId]}") String jobId,
                                                             @Value("#{jobParameters[taskId]}") String taskId,
                                                             @Value("#{jobParameters[refId]}") String refId,
                                                             @Value("#{jobParameters[url]}") String url,
                                                             @Value("#{jobParameters[targetType]}") String targetType,
                                                             @Value("#{jobParameters[gameType]}") String gameType) {
        return new PushItemProcessor(pushMessage, jobId, taskId, refId, url, targetType, gameType);
    }

    @Bean
    public LineMapper<PushItemDTO> snidLineMapper() {
        DefaultLineMapper<PushItemDTO> lineMapper = new DefaultLineMapper<PushItemDTO>();
        DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
        lineTokenizer.setDelimiter(",");
        lineTokenizer.setStrict(true);
        lineTokenizer.setStrict(true);
        String[] splittedHeader = snidHeader.split(",");
        lineTokenizer.setNames(splittedHeader);
        BeanWrapperFieldSetMapper<PushItemDTO> fieldSetMapper = new BeanWrapperFieldSetMapper<PushItemDTO>();
        fieldSetMapper.setTargetType(PushItemDTO.class);

        lineMapper.setLineTokenizer(lineTokenizer);
        lineMapper.setFieldSetMapper(new PushItemFieldSetMapper());
        return lineMapper;
    }
}


 @Bean
    @Override
    public SimpleAsyncTaskExecutor taskProcessingExecutor() {
        SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
        simpleAsyncTaskExecutor.setConcurrencyLimit(300);
        return simpleAsyncTaskExecutor;
    }

您认为我如何改善处理性能并使它们更快? 谢谢

How do you think I could improve the processing performances and make them faster? thank you

ItemWriter代码:

ItemWriter code:

 @Bean
    public ItemWriter writer() {
        return new KafkaWriter();
    }


public class KafkaWriter implements ItemWriter<PushItemDTO> {


    private static final Logger logger = LoggerFactory.getLogger(KafkaWriter.class);

    @Autowired
    KafkaProducer kafkaProducer;

    @Override
    public void write(List<? extends PushItemDTO> items) throws Exception {

        for (PushItemDTO item : items) {
            try {
                logger.debug("Writing to kafka=" + item);
                sendMessageToKafka(item);
            } catch (Exception e) {
                logger.error("Error writing item=" + item.toString(), e);
            }
        }
    }

推荐答案

增加提交次数是我的起点.请记住提交计数的含义.由于将其设置为1,因此您将对每个项目执行以下 :

Increasing your commit count is where I'd begin. Keep in mind what the commit count means. Since you have it set at 1, you are doing the following for each item:

  1. 开始交易
  2. 阅读项目
  3. 处理项目
  4. 写项目
  5. 更新作业存储库
  6. 提交交易

您的配置没有显示委托ItemWriter是什么,所以我无法分辨,但是至少您要执行多个SQL语句每项以更新作业存储库.

Your configuration doesn't show what the delegate ItemWriter is so I can't tell, but at a minimum you are executing multiple SQL statements per item to update the job repository.

您是正确的,因为FlatFileItemReader不是线程安全的.但是,您并没有使用多个线程来读取数据,而是仅进行处理,因此根据我的观察,没有理由将提交计数设置为1.

You are correct in that the FlatFileItemReader is not thread safe. However, you aren't using multiple threads to read, only process so there is no reason to set the commit count to 1 from what I can see.

这篇关于如何使用FlatFileItemReader和异步处理器优化性能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆