如果在处理步骤中发生故障,如何使 Spring Cloud 流 Kafka 流绑定器重试处理消息? [英] How to make Spring cloud stream Kafka streams binder retry processing a message if a failure occurs during the processing step?

本文介绍了如果在处理步骤中发生故障,如何使 Spring Cloud 流 Kafka 流绑定器重试处理消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Spring Cloud Stream 处理 Kafka Streams.在消息处理应用程序中,有可能会产生错误.所以消息不应该被提交并再次重试.

I am working on Kafka Streams using Spring Cloud Stream. In the message processing application, there may be a chance that it will produce an error. So the message should not be commited and retried again.

我的申请方法-

@Bean
public Function<KStream<Object, String>, KStream<String, Long>> process() {
return (input) -> {
KStream<Object, String> kt = input.flatMapValues(v -> Arrays.asList(v.toUpperCase().split("\\W+")));
KGroupedStream<String, String> kgt =kt.map((k, v) -> new KeyValue<>(v, v)).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
KTable<Windowed<String>, Long> ktable = kgt.windowedBy(TimeWindows.of(500)).count();
KStream<String, WordCount> kst =ktable.toStream().map((k,v) -> {
WordCount wc = new WordCount();
wc.setWord(k.key());
wc.setCount(v);
wc.setStart(new Date(k.window().start()));
wc.setEnd(new Date(k.window().end()));

dao.insert(wc);

return new KeyValue<>(k.key(),wc);
});
return kst.map((k,v) -> new KeyValue<>(k, v.getCount()));
};
}

此处如果 DAO 插入方法失败,则不应将消息发布到输出主题,而应重试对同一消息的处理.

Here if DAO insert method fails, the message should not be published to output topic and the processing of the same message should be retried.

我们如何配置 kafka 流绑定器来做到这一点?非常感谢您对此的任何帮助.

How can we configure kafka streams binder to do this ?. Any help regarding this is much appreciated.

推荐答案

Spring Cloud Stream Kafka Streams binder 本身在业务逻辑的执行中不提供此类重试机制.但是,解决此用例的一种方法可能是将您的关键调用(在本例中为 dao.insert())包装在您本地定义的 RetryTemplate 中.这是一个可能的实现,它使用 1 秒的退避策略重试 10 次.如果您正在尝试此解决方案,请确保从您的主要业务逻辑中提取与 RetryTemplate 相关的通用代码.我没有试过这个,但它应该工作.

Spring Cloud Stream Kafka Streams binder itself does not provide such retrying mechanisms within the execution of your business logic. However, one way to solve this use case may by wrapping your critical call (dao.insert() in this case) in a RetryTemplate that you define locally. Here is a possible implementation that retries 10 times with a backoff policy of 1 second. If you are trying this solution out, make sure to extract the RetryTemplate related common code out of your main business logic. I haven't tried this, but it should work.

KStream<String, WordCount> kst =ktable.toStream().map((k,v) -> {
  WordCount wc = new WordCount();
  ...

  org.springframework.retry.support.RetryTemplate retryTemplate = new 
   RetryTemplate();

  RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
  FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
  backOffPolicy.setBackOffPeriod(1000);

  retryTemplate.setBackOffPolicy(backOffPolicy);
  retryTemplate.setRetryPolicy(retryPolicy);

  retryTemplate.execute(context -> {
    try {
      dao.insert(wc);
    }
    catch (Exception e) {
      throw new IllegalStateException(..);
   }
  });

  return new KeyValue<>(k.key(),wc);
});

重试 dao 插入操作 10 次后的事件,如果仍然失败,将抛出异常终止应用程序,在这种情况下不会提交偏移量.在重新启动时,在修复潜在问题后,您的应用程序仍应从此偏移量继续.

Event after retrying the dao insert operation 10 times, if it still fails, the exception will be thrown which will terminate the application in which case the offset will not be committed. On the restart, after fixing the underlying issue, your application should still continue from this offset.

这篇关于如果在处理步骤中发生故障,如何使 Spring Cloud 流 Kafka 流绑定器重试处理消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆