如何在具有Kafka集成的Spring Batch中调用StepExcecutionListener? [英] How to call StepExcecutionListener in spring batch with kafka integration?

查看:172
本文介绍了如何在具有Kafka集成的Spring Batch中调用StepExcecutionListener?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面是etl.xml中job的配置

Below is the config of job in etl.xml

< batch:job id ="procuerJob">

<batch:job id="procuerJob">

<batch:step id="Produce">

    <batch:partition partitioner="partitioner">

        <batch:handler grid-size="${ partitioner.limit}"></batch:handler>

        <batch:step>

            <batch:tasklet>

                <batch:chunk reader="Reader" writer="kafkaProducer"
                             commit-interval="20000">

                </batch:chunk>

                <batch:listeners>

                    <batch:listener ref="producingListener" />

                    
                </batch:listeners>

            </batch:tasklet>

        </batch:step>

    </batch:partition>

</batch:step>

</batch:job>

</batch:job>

下面是用于向该主题发送消息的代码.

below is the code used to send messaged to the topic.

ListenableFuture< SendResult<字符串,消息>> listenableFuture = kafkaTemplate.send(message);

ListenableFuture<SendResult<String, message>> listenableFuture = kafkaTemplate.send(message);

listenableFuture.addCallback(new ListenableFutureCallback< SendResult< String,message>>(){

listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, message >>() {

@Override
public void onSuccess(SendResult<String, message > result) {
    log.info("marking as SUCCESS");
    manager.updateStatus("someTable", KafkaResponse.SUCCESS);
}

@Override
public void onFailure(Throwable ex) {
    log.info("marking as FAILURE");
    manager.updateKafkaStatus(someTable, KafkaResponse.FAILURE);
}

}

执行kafkaTemplate.send(message)后,将调用侦听器并完成作业.我看到了 在作业完成后调用onSuccess(),onFailure(). 我如何更改job的配置,以便在收到来自kafka主题的确认后调用侦听器?

Once the kafkaTemplate.send(message)is executed , the listener is called and the job completes. I see the onSuccess(), onFailure() are called post the job is completed. How can I chnage the config of job so that listener is called after receiving the acknowledgement from kafka topic?

推荐答案

您是否想举一些示例代码,说明您建议阻止等待未来.可能会有帮助.

Would you like to some code example of what you suggested to block waiting for future. That might be of help.

我没有尝试以下方法,但是这里是一个主意:

I did not try the following but here is the idea:

ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send(message);
try {
    SendResult<String, Message> sendResult = future.get();
    // inspect sendResult
    log.info("marking as SUCCESS");
    manager.updateStatus("someTable", KafkaResponse.SUCCESS);
} catch (Exception e) {
     log.info("marking as FAILURE");
     manager.updateKafkaStatus(someTable, KafkaResponse.FAILURE);
     // do something with e
}

这篇关于如何在具有Kafka集成的Spring Batch中调用StepExcecutionListener?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆