从Spring Batch处理器调用Async REST API [英] Calling Async REST api from spring batch processor

查看:347
本文介绍了从Spring Batch处理器调用Async REST API的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我写了一个春季批处理作业来处理列表清单.

I wrote a spring batch job that processes List of Lists.

阅读器返回列表列表. 处理器对每个ListItem进行处理,并返回处理后的List. Writer从列表列表中将内容写入DB和sftp.

Reader returns List of List. Processor works on each ListItem and returns processed List. Writer writes stuff to DB and sftp from List of List.

我有一个用例,其中我从Spring Batch处理器调用Async REST api. 在ListenableFuture响应上,我实现了LitenableFutureCallback来处理成功和失败,这可以按预期工作,但是在异步调用返回某些内容之前,ItemProcessor不必等待异步api的回调并将object(List)返回给writer.

I have a use case where I call Async REST api from spring batch processor. On ListenableFuture response I implemented LitenableFutureCallback to handle success and failure, which works as expected, but before the async call returns something, ItemProcessor dosen't wait for call backs from async api and returns object(List) to writer.

我不确定如何实现和处理来自ItemProcessor的异步调用.

I am not sure how to implement and handle async calls from ItemProcessor.

我确实读过有关AsyncItemProcessor和AsyncItemWriter的信息,但是我不确定在这种情况下是否应该使用这种东西.

I did read about AsyncItemProcessor and AsyncItemWriter, but I am not sure if that is something I should use in this scenario.

我还考虑过在AsyncRestTemplate的ListenableFuture响应上调用get(),但是根据文档,它将阻塞当前线程,直到接收到响应为止.

I also thought of calling get() on ListenableFuture response from AsyncRestTemplate, but as per documentation it will block the current thread until it receives the response.

我正在寻求有关如何实施此方法的帮助.下面的代码段:

I am seeking for some help on how should I implement this. Code snippet below:

处理器:

public class MailDocumentProcessor implements ItemProcessor<List<MailingDocsEntity>, List<MailingDocsEntity>> {

... Initialization code

@Override
public List<MailingDocsEntity> process(List<MailingDocsEntity> documentsList) throws Exception {
    logger.info("Entering MailingDocsEntity processor");


    List<MailingDocsEntity> synchronizedList = Collections.synchronizedList(documentsList);


    for (MailingDocsEntity mailingDocsEntity : synchronizedList) {
        System.out.println("Reading Mailing id: " + mailingDocsEntity.getMailingId());

       ..code to get the file

         //If the file is not a pdf convert it
         String fileExtension = readFromSpResponse.getFileExtension();
         String fileName = readFromSpResponse.getFileName();
         byte[] fileBytes = readFromSpResponse.getByteArray();

         try {

             //Do checks to make sure PDF file is being sent
             if (!"pdf".equalsIgnoreCase(fileExtension)) {
                 //Only doc, docx and xlsx conversions are supported

                     ...Building REquest object
                     //make async call to pdf conversion service
            pdfService.convertDocxToPdf(request, mailingDocsEntity);

                 } else {
                     logger.error("The file cannot be converted to a pdf.\n"
                        );

                 }
             }


         } catch (Exception ex){
             logger.error("There has been an exception while processing data", ex);

         }

    }
    return synchronizedList;
}

}

异步PdfConversion服务类:

Async PdfConversion Service Class:

@Service
public class PdfService{


   @Autowired
   @Qualifier("MicroServiceAsyncRestTemplate")
   AsyncRestTemplate microServiceAsyncRestTemplate;

   public ConvertDocxToPdfResponse convertDocxToPdf(ConvertDocxToPdfRequest request, MailingDocsEntity mailingDocsEntity){

        ConvertDocxToPdfResponse pdfResponse = new ConvertDocxToPdfResponse();


            try {

                HttpHeaders headers = new HttpHeaders();
                headers.setContentType(MediaType.APPLICATION_JSON);

                HttpEntity<?> entity = new HttpEntity<>(request, headers);



                ListenableFuture<ResponseEntity<ConvertDocxToPdfResponse>> microServiceResponse = microServiceAsyncRestTemplate.postForEntity(batchMailProcessingConfiguration.getPdfUrl(), entity, ConvertDocxToPdfResponse.class);

                ConvertDocxToPdfResponse resultBody = microServiceResponse.get().getBody();
                microServiceResponse.addCallback(new ListenableFutureCallback<ResponseEntity<ConvertDocxToPdfResponse>>()  {

                    @Override
                    public void onSuccess(ResponseEntity<ConvertDocxToPdfResponse> result) {
                        ...code to do stuff on success


                    }

                    @Override
                    public void onFailure(Throwable ex) {
                        pdfResponse.setMessage("Exception while retrieving response");

                    }
                });

            } catch (Exception e) {
                String message = "There has been an error while issuing a pdf generate request to the pdf micro service";
                pdfResponse.setMessage(message);
                logger.error(message, e);
            }


        return pdfResponse;
    }

}

我原来的批处理作业是同步的,我正在转换为异步以加快处理速度. 我确实尝试寻找类似的问题,但是找不到足够的信息. 任何指针或帮助都将受到高度赞赏.

My original batch job was synchronous, i am converting to Async to faster processing. I did try to look for similar questions, but could not find enough information. Any pointers or help is highly appreciated.

谢谢!

推荐答案

我确实读过有关AsyncItemProcessor和AsyncItemWriter的文章,但是我不确定在这种情况下是否应该使用这种东西.

I did read about AsyncItemProcessor and AsyncItemWriter, but I am not sure if that is something I should use in this scenario.

是的,AsyncItemProcessorAsyncItemWriter适合您的用例. AsyncItemProcessor将为新线程上的项目执行委托ItemProcessor的逻辑(您的休息呼叫).项目完成后,结果的Future将传递到AsynchItemWriter进行写入.然后AsynchItemWriter将打开Future的包装并写入项目.这些组件的优点是您不必自己处理Future的包装,展开等.

Yes, AsyncItemProcessor and AsyncItemWriter are suitable for your use case. The AsyncItemProcessor will execute the logic (your rest call) of the delegate ItemProcessor for an item on a new thread. Once the item completes, the Future of the result is passed to the AsynchItemWriter to be written. The AsynchItemWriter will then unwrap the Future and write the item. The advantage of these components is that you don't have to deal with Futures wrapping, unwrapping, etc yourself.

您可以找到:

  • 此处有更多详细信息:
  • More details here: https://docs.spring.io/spring-batch/4.0.x/reference/html/spring-batch-integration.html#asynchronous-processors
  • An example here: https://github.com/mminella/scaling-demos/blob/master/single-jvm-demos/src/main/java/io/spring/batch/scalingdemos/asyncprocessor/AsyncProcessorJobApplication.java

希望这会有所帮助.

这篇关于从Spring Batch处理器调用Async REST API的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆