当控制去捕获块时如何停止发送到 kafka 主题功能 kafka spring [英] How to stop sending to kafka topic when control goes to catch block Functional kafka spring

查看:33
本文介绍了当控制去捕获块时如何停止发送到 kafka 主题功能 kafka spring的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

你能告诉我如何停止发送到我的第三个 kafka 主题,当控件到达 catch 块时,当前消息被发送到错误主题以及在正常情况下应该发送到的主题加工.代码片段如下:

could you please advise , how can I stop sending to my 3rd kafka topic, when the control reaches the catch block, currently the message is sent to both error topic as well as the topic to which it should send in case of normal processing. A snippet of code is as below:

@Component
public class Abc {
private final StreamBridge streamBridge;
public Abc (StreamBridge streamBridge)
this.streamBridge = streamBridge;
@Bean
public Function<KStream<String, KafkaClass>, KStream<String,KafkaClass>> hiProcess() {
return input -> input.map((key,value) -> {
try{
KafkaClass stream = processFunction();
}
catch(Exception e) {
Message<KakfaClass> mess = MessageBuilder.withPayload(value).build();
streamBridge.send("errProcess-out-0". mess);
}
return new KeyValue<>(key, stream);
})
}
}

    

推荐答案

这可以使用以下模式实现:

This can be implemented using the following pattern:

KafkaClass stream;
return input -> input
    .branch((k, v) -> {
        try {
            stream = processFunction();
            return true;
        }
        catch (Exception e) {
            Message<KakfaClass> mess = MessageBuilder.withPayload(value).build();
            streamBridge.send("errProcess-out-0". mess);
            return false;
        }
       },
       (k, v) -> true)[0]
    .map((k, v) -> new KeyValue<>(k, stream));

在这里,我们使用 KStream 的分支功能 (API) 将您的输入分成两条路径 - 正常流和导致错误的路径.这是通过为 branch 方法调用提供两个过滤器来实现的.第一个过滤器是您调用 processFunction 方法并获得响应的正常流程.如果我们没有得到异常,过滤器返回true,分支操作的结果被捕获在输出数组[0]的第一个元素中,即在 map 操作中进行下游处理,将最终结果发送到出站主题.

Here, we are using the branching feature (API) of KStream to split your input into two paths - normal flow and the one causing the errors. This is accomplished by providing two filters to the branch method call. The first filter is the normal flow in which you call the processFunction method and get a response back. If we don't get an exception, the filter returns true, and the result of the branch operation is captured in the first element of the output array [0] which is processed downstream in the map operation in which it sends the final result to the outbound topic.

另一方面,如果它抛出异常,它会使用StreamBridge 向错误主题发送任何必要的信息,并且过滤器返回false.由于下游的map 操作仅在分支[0] 的数组的第一个元素上执行,因此不会向外发送任何内容.当第一个过滤器返回 false 时,它会转到始终返回 true 的第二个过滤器.这是一个无操作过滤器,其结果将被完全忽略.

On the other hand, if it throws an exception, it sends whatever is necessary to the error topic using StreamBridge and the filter returns false. Since the downstream map operation is only performed on the first element of the array from branching [0], nothing will be sent outbound. When the first filter returns false, it goes to the second filter which always returns true. This is a no-op filter where the results are completely ignored.

此特定实现的一个缺点是您需要将来自 processFunction 的响应存储在一个实例字段中,然后在每个传入的 KStream 记录上进行变异,以便您可以访问它在您发送输出的最终 map 方法中的值.但是,对于这个特定用例,这可能不是问题.

One downside of this particular implementation is that you need to store the response from processFunction in an instance field and then mutate on each incoming KStream record so that you can access its value in the final map method where you send the output. However, for this particular use case, this may not be an issue.

这篇关于当控制去捕获块时如何停止发送到 kafka 主题功能 kafka spring的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆