如何使用FlatFileItemReader和异步处理器优化性能 [英] How to optimize my performances using FlatFileItemReader and Asynchronous Processors
问题描述
我有一个约40万行的简单csv文件(仅一列) 我花了很多时间来读取记录并进行处理
I have a simple csv file with ~400,000 line(one column only) It takes me alot of time to read the records and process them
处理器根据沙发床验证记录
the processor validating records against couchbase
作者-写入远程主题 带我大约30分钟.那太疯狂了.
the writer - writing into remote topic Takes me around 30 mins. thats insane.
我读到flatfileItemreader不是线程安全的.所以我的块值是1.
I read that flatfileItemreader is not thread safe. so my chunk value is 1.
我读到异步处理可以提供帮助.但我看不到任何改善.
I read the Asynchronous processing could assist. but I cant see any improvements.
那是我的代码:
@Configuration
@EnableBatchProcessing
public class NotificationFileProcessUploadedFileJob {
@Value("${expected.snid.header}")
public String snidHeader;
@Value("${num.of.processing.chunks.per.file}")
public int numOfProcessingChunksPerFile;
@Autowired
private InfrastructureConfigurationConfig infrastructureConfigurationConfig;
private static final String OVERRIDDEN_BY_EXPRESSION = null;
@Inject
private JobBuilderFactory jobs;
@Inject
private StepBuilderFactory stepBuilderFactory;
@Inject
ExecutionContextPromotionListener executionContextPromotionListener;
@Bean
public Job processUploadedFileJob() throws Exception {
return this.jobs.get("processUploadedFileJob").start((processSnidUploadedFileStep())).build();
}
@Bean
public Step processSnidUploadedFileStep() {
return stepBuilderFactory.get("processSnidFileStep")
.<PushItemDTO, PushItemDTO>chunk(numOfProcessingChunksPerFile)
.reader(snidFileReader(OVERRIDDEN_BY_EXPRESSION))
.processor(asyncItemProcessor())
.writer(asyncItemWriter())
// .throttleLimit(20)
// .taskJobExecutor(infrastructureConfigurationConfig.taskJobExecutor())
// .faultTolerant()
// .skipLimit(10) //default is set to 0
// .skip(MySQLIntegrityConstraintViolationException.class)
.build();
}
@Inject
ItemWriter writer;
@Bean
public AsyncItemWriter asyncItemWriter() {
AsyncItemWriter asyncItemWriter=new AsyncItemWriter();
asyncItemWriter.setDelegate(writer);
return asyncItemWriter;
}
@Bean
@Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
public ItemStreamReader<PushItemDTO> snidFileReader(@Value("#{jobParameters[filePath]}") String filePath) {
FlatFileItemReader<PushItemDTO> itemReader = new FlatFileItemReader<PushItemDTO>();
itemReader.setLineMapper(snidLineMapper());
itemReader.setLinesToSkip(1);
itemReader.setResource(new FileSystemResource(filePath));
return itemReader;
}
@Bean
public AsyncItemProcessor asyncItemProcessor() {
AsyncItemProcessor<PushItemDTO, PushItemDTO> asyncItemProcessor = new AsyncItemProcessor();
asyncItemProcessor.setDelegate(processor(OVERRIDDEN_BY_EXPRESSION, OVERRIDDEN_BY_EXPRESSION, OVERRIDDEN_BY_EXPRESSION,
OVERRIDDEN_BY_EXPRESSION, OVERRIDDEN_BY_EXPRESSION, OVERRIDDEN_BY_EXPRESSION, OVERRIDDEN_BY_EXPRESSION));
asyncItemProcessor.setTaskExecutor(infrastructureConfigurationConfig.taskProcessingExecutor());
return asyncItemProcessor;
}
@Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
@Bean
public ItemProcessor<PushItemDTO, PushItemDTO> processor(@Value("#{jobParameters[pushMessage]}") String pushMessage,
@Value("#{jobParameters[jobId]}") String jobId,
@Value("#{jobParameters[taskId]}") String taskId,
@Value("#{jobParameters[refId]}") String refId,
@Value("#{jobParameters[url]}") String url,
@Value("#{jobParameters[targetType]}") String targetType,
@Value("#{jobParameters[gameType]}") String gameType) {
return new PushItemProcessor(pushMessage, jobId, taskId, refId, url, targetType, gameType);
}
@Bean
public LineMapper<PushItemDTO> snidLineMapper() {
DefaultLineMapper<PushItemDTO> lineMapper = new DefaultLineMapper<PushItemDTO>();
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setDelimiter(",");
lineTokenizer.setStrict(true);
lineTokenizer.setStrict(true);
String[] splittedHeader = snidHeader.split(",");
lineTokenizer.setNames(splittedHeader);
BeanWrapperFieldSetMapper<PushItemDTO> fieldSetMapper = new BeanWrapperFieldSetMapper<PushItemDTO>();
fieldSetMapper.setTargetType(PushItemDTO.class);
lineMapper.setLineTokenizer(lineTokenizer);
lineMapper.setFieldSetMapper(new PushItemFieldSetMapper());
return lineMapper;
}
}
@Bean
@Override
public SimpleAsyncTaskExecutor taskProcessingExecutor() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setConcurrencyLimit(300);
return simpleAsyncTaskExecutor;
}
您认为我如何改善处理性能并使它们更快? 谢谢
How do you think I could improve the processing performances and make them faster? thank you
ItemWriter代码:
ItemWriter code:
@Bean
public ItemWriter writer() {
return new KafkaWriter();
}
public class KafkaWriter implements ItemWriter<PushItemDTO> {
private static final Logger logger = LoggerFactory.getLogger(KafkaWriter.class);
@Autowired
KafkaProducer kafkaProducer;
@Override
public void write(List<? extends PushItemDTO> items) throws Exception {
for (PushItemDTO item : items) {
try {
logger.debug("Writing to kafka=" + item);
sendMessageToKafka(item);
} catch (Exception e) {
logger.error("Error writing item=" + item.toString(), e);
}
}
}
推荐答案
增加提交次数是我的起点.请记住提交计数的含义.由于将其设置为1,因此您将对每个项目执行以下 :
Increasing your commit count is where I'd begin. Keep in mind what the commit count means. Since you have it set at 1, you are doing the following for each item:
- 开始交易
- 阅读项目
- 处理项目
- 写项目
- 更新作业存储库
- 提交交易
您的配置没有显示委托ItemWriter
是什么,所以我无法分辨,但是至少您要执行多个SQL语句每项以更新作业存储库.
Your configuration doesn't show what the delegate ItemWriter
is so I can't tell, but at a minimum you are executing multiple SQL statements per item to update the job repository.
您是正确的,因为FlatFileItemReader
不是线程安全的.但是,您并没有使用多个线程来读取数据,而是仅进行处理,因此根据我的观察,没有理由将提交计数设置为1.
You are correct in that the FlatFileItemReader
is not thread safe. However, you aren't using multiple threads to read, only process so there is no reason to set the commit count to 1 from what I can see.
这篇关于如何使用FlatFileItemReader和异步处理器优化性能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!