Spring Cloud Kafka Streams中的错误处理 [英] Error handling in Spring Cloud Kafka Streams

查看:19
本文介绍了Spring Cloud Kafka Streams中的错误处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是Spring Cloud Stream和Kafka Stream。假设我有一个处理器,它是一个将字符串的KStream转换为CityProgrammes的KStream的函数。它调用一个API来按名称查找City,并调用另一个转换来查找该城市附近的任何事件。

现在的问题是转换过程中发生任何错误,整个应用程序都会停止。我想把这一条特别的信息发送给DLQ,然后继续前进。我已经读了几天了,每个人都建议在被调用的服务中处理错误,但在我看来这是无关紧要的,而且我仍然需要返回一个KStream:我如何在Catch中做到这一点?

我还查看了UncaughtExeptionHandler,但它不知道该消息,并且只能重新启动处理,该处理不会跳过此无效消息。

这听起来可能像是A-B问题,因此问题被重新表述为:当发生异常时如何维护KStream中的流并将无效项发送到DLQ?

推荐答案

遇到应用程序级错误时,如何处理错误取决于应用程序本身。Kafka Streams和Spring Cloud Stream绑定器主要支持框架级别的反序列化和序列化错误。虽然情况就是这样,但我认为你的情况是可以处理的。如果你使用的是Kafka客户端2.8之前的版本,这是我之前给出的类似答案:https://stackoverflow.com/a/66749750/2070861

如果您使用的是Kafka/Streams 2.8,这里有一个您可以使用的想法。但是,下面的代码应该仅用作起点。根据您的用例进行调整。阅读更多关于分支如何在Kafka Streams 2.8中工作的内容。在2.8版中,branching API在以前版本的基础上进行了显著重构。

public Function<KStream<?, String>, KStream<?, Foo>> convert() {
            Foo[] foo = new Foo[0];
            return input -> {
                final Map<String, ? extends KStream<?, String>> branches =
                        input.split(Named.as("foo-")).branch((key, value) -> {
                                    try {
                                        foo[0] = new Foo(); // your API call for CitiProgramme converion here, possibly.
                                        return true;
                                    }
                                    catch (Exception e) {
                                        Message<?> message = MessageBuilder.withPayload(value).build();
                                        streamBridge.send("to-my-dlt", message);
                                        return false;
                                    }

                                }, Branched.as("bar"))
                                .defaultBranch();

                final KStream<?, String> kStream = branches.get("foo-bar");
                return kStream.map((key, value) -> new KeyValue<>("", foo[0]));
            };

        }


    }

此代码中忽略了默认分支,因为它只包含引发异常的记录。这些是由上面的catch语句处理的,在该语句中,我们以编程方式将记录发送到DLT。最后,我们获得好的记录,并将它们映射到新的KStream,然后通过出站发送它。

这篇关于Spring Cloud Kafka Streams中的错误处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆