如何使用 kafka 集成在 spring 批处理中调用 StepExcecutionListener? [英] How to call StepExcecutionListener in spring batch with kafka integration?

查看:22
本文介绍了如何使用 kafka 集成在 spring 批处理中调用 StepExcecutionListener?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面是etl.xml中job的配置

Below is the config of job in etl.xml

<batch:job id="procuerJob">

<batch:step id="Produce">

    <batch:partition partitioner="partitioner">

        <batch:handler grid-size="${ partitioner.limit}"></batch:handler>

        <batch:step>

            <batch:tasklet>

                <batch:chunk reader="Reader" writer="kafkaProducer"
                             commit-interval="20000">

                </batch:chunk>

                <batch:listeners>

                    <batch:listener ref="producingListener" />

                    
                </batch:listeners>

            </batch:tasklet>

        </batch:step>

    </batch:partition>

</batch:step>

</batch:job>

</batch:job>

下面是用于向主题发送消息的代码.

below is the code used to send messaged to the topic.

ListenableFuture>listenableFuture = kafkaTemplate.send(message);

ListenableFuture<SendResult<String, message>> listenableFuture = kafkaTemplate.send(message);

listenableFuture.addCallback(new ListenableFutureCallback() {

listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, message >>() {

@Override
public void onSuccess(SendResult<String, message > result) {
    log.info("marking as SUCCESS");
    manager.updateStatus("someTable", KafkaResponse.SUCCESS);
}

@Override
public void onFailure(Throwable ex) {
    log.info("marking as FAILURE");
    manager.updateKafkaStatus(someTable, KafkaResponse.FAILURE);
}

}

一旦 kafkaTemplate.send(message) 被执行,监听器就会被调用并完成任务.我看到了onSuccess(), onFailure() 在作业完成后被调用.如何更改作业的配置,以便在收到 kafka 主题的确认后调用侦听器?

Once the kafkaTemplate.send(message)is executed , the listener is called and the job completes. I see the onSuccess(), onFailure() are called post the job is completed. How can I chnage the config of job so that listener is called after receiving the acknowledgement from kafka topic?

推荐答案

您是否想要一些代码示例,说明您建议阻止等待未来的内容.这可能会有所帮助.

Would you like to some code example of what you suggested to block waiting for future. That might be of help.

我没有尝试以下方法,但我的想法是:

I did not try the following but here is the idea:

ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send(message);
try {
    SendResult<String, Message> sendResult = future.get();
    // inspect sendResult
    log.info("marking as SUCCESS");
    manager.updateStatus("someTable", KafkaResponse.SUCCESS);
} catch (Exception e) {
     log.info("marking as FAILURE");
     manager.updateKafkaStatus(someTable, KafkaResponse.FAILURE);
     // do something with e
}

这篇关于如何使用 kafka 集成在 spring 批处理中调用 StepExcecutionListener?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆