从Spring Batch处理器调用Async REST API [英] Calling Async REST api from spring batch processor
问题描述
我写了一个春季批处理作业来处理列表清单.
I wrote a spring batch job that processes List of Lists.
阅读器返回列表列表. 处理器对每个ListItem进行处理,并返回处理后的List. Writer从列表列表中将内容写入DB和sftp.
Reader returns List of List. Processor works on each ListItem and returns processed List. Writer writes stuff to DB and sftp from List of List.
我有一个用例,其中我从Spring Batch处理器调用Async REST api. 在ListenableFuture响应上,我实现了LitenableFutureCallback来处理成功和失败,这可以按预期工作,但是在异步调用返回某些内容之前,ItemProcessor不必等待异步api的回调并将object(List)返回给writer.
I have a use case where I call Async REST api from spring batch processor. On ListenableFuture response I implemented LitenableFutureCallback to handle success and failure, which works as expected, but before the async call returns something, ItemProcessor dosen't wait for call backs from async api and returns object(List) to writer.
我不确定如何实现和处理来自ItemProcessor的异步调用.
I am not sure how to implement and handle async calls from ItemProcessor.
我确实读过有关AsyncItemProcessor和AsyncItemWriter的信息,但是我不确定在这种情况下是否应该使用这种东西.
I did read about AsyncItemProcessor and AsyncItemWriter, but I am not sure if that is something I should use in this scenario.
我还考虑过在AsyncRestTemplate的ListenableFuture响应上调用get(),但是根据文档,它将阻塞当前线程,直到接收到响应为止.
I also thought of calling get() on ListenableFuture response from AsyncRestTemplate, but as per documentation it will block the current thread until it receives the response.
我正在寻求有关如何实施此方法的帮助.下面的代码段:
I am seeking for some help on how should I implement this. Code snippet below:
处理器:
public class MailDocumentProcessor implements ItemProcessor<List<MailingDocsEntity>, List<MailingDocsEntity>> {
... Initialization code
@Override
public List<MailingDocsEntity> process(List<MailingDocsEntity> documentsList) throws Exception {
logger.info("Entering MailingDocsEntity processor");
List<MailingDocsEntity> synchronizedList = Collections.synchronizedList(documentsList);
for (MailingDocsEntity mailingDocsEntity : synchronizedList) {
System.out.println("Reading Mailing id: " + mailingDocsEntity.getMailingId());
..code to get the file
//If the file is not a pdf convert it
String fileExtension = readFromSpResponse.getFileExtension();
String fileName = readFromSpResponse.getFileName();
byte[] fileBytes = readFromSpResponse.getByteArray();
try {
//Do checks to make sure PDF file is being sent
if (!"pdf".equalsIgnoreCase(fileExtension)) {
//Only doc, docx and xlsx conversions are supported
...Building REquest object
//make async call to pdf conversion service
pdfService.convertDocxToPdf(request, mailingDocsEntity);
} else {
logger.error("The file cannot be converted to a pdf.\n"
);
}
}
} catch (Exception ex){
logger.error("There has been an exception while processing data", ex);
}
}
return synchronizedList;
}
}
异步PdfConversion服务类:
Async PdfConversion Service Class:
@Service
public class PdfService{
@Autowired
@Qualifier("MicroServiceAsyncRestTemplate")
AsyncRestTemplate microServiceAsyncRestTemplate;
public ConvertDocxToPdfResponse convertDocxToPdf(ConvertDocxToPdfRequest request, MailingDocsEntity mailingDocsEntity){
ConvertDocxToPdfResponse pdfResponse = new ConvertDocxToPdfResponse();
try {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<?> entity = new HttpEntity<>(request, headers);
ListenableFuture<ResponseEntity<ConvertDocxToPdfResponse>> microServiceResponse = microServiceAsyncRestTemplate.postForEntity(batchMailProcessingConfiguration.getPdfUrl(), entity, ConvertDocxToPdfResponse.class);
ConvertDocxToPdfResponse resultBody = microServiceResponse.get().getBody();
microServiceResponse.addCallback(new ListenableFutureCallback<ResponseEntity<ConvertDocxToPdfResponse>>() {
@Override
public void onSuccess(ResponseEntity<ConvertDocxToPdfResponse> result) {
...code to do stuff on success
}
@Override
public void onFailure(Throwable ex) {
pdfResponse.setMessage("Exception while retrieving response");
}
});
} catch (Exception e) {
String message = "There has been an error while issuing a pdf generate request to the pdf micro service";
pdfResponse.setMessage(message);
logger.error(message, e);
}
return pdfResponse;
}
}
我原来的批处理作业是同步的,我正在转换为异步以加快处理速度. 我确实尝试寻找类似的问题,但是找不到足够的信息. 任何指针或帮助都将受到高度赞赏.
My original batch job was synchronous, i am converting to Async to faster processing. I did try to look for similar questions, but could not find enough information. Any pointers or help is highly appreciated.
谢谢!
推荐答案
我确实读过有关AsyncItemProcessor和AsyncItemWriter的文章,但是我不确定在这种情况下是否应该使用这种东西.
I did read about AsyncItemProcessor and AsyncItemWriter, but I am not sure if that is something I should use in this scenario.
是的,AsyncItemProcessor
和AsyncItemWriter
适合您的用例. AsyncItemProcessor
将为新线程上的项目执行委托ItemProcessor
的逻辑(您的休息呼叫).项目完成后,结果的Future
将传递到AsynchItemWriter
进行写入.然后AsynchItemWriter
将打开Future
的包装并写入项目.这些组件的优点是您不必自己处理Future
的包装,展开等.
Yes, AsyncItemProcessor
and AsyncItemWriter
are suitable for your use case. The AsyncItemProcessor
will execute the logic (your rest call) of the delegate ItemProcessor
for an item on a new thread. Once the item completes, the Future
of the result is passed to the AsynchItemWriter
to be written. The AsynchItemWriter
will then unwrap the Future
and write the item. The advantage of these components is that you don't have to deal with Future
s wrapping, unwrapping, etc yourself.
您可以找到:
- More details here: https://docs.spring.io/spring-batch/4.0.x/reference/html/spring-batch-integration.html#asynchronous-processors
- An example here: https://github.com/mminella/scaling-demos/blob/master/single-jvm-demos/src/main/java/io/spring/batch/scalingdemos/asyncprocessor/AsyncProcessorJobApplication.java
希望这会有所帮助.
这篇关于从Spring Batch处理器调用Async REST API的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!