使用 Kafka 的 Streams API 处理坏消息 [英] Handling bad messages using Kafka's Streams API

查看:27
本文介绍了使用 Kafka 的 Streams API 处理坏消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个基本的流处理流程,看起来像

I have a basic stream processing flow which looks like

master topic -> my processing in a mapper/filter -> output topics

我想知道处理坏消息"的最佳方式.这可能是诸如我无法正确反序列化的消息之类的事情,或者处理/过滤逻辑可能以某种意外方式失败(我没有外部依赖项,因此不应该存在此类瞬态错误).

and I am wondering about the best way to handle "bad messages". This could potentially be things like messages that I can't deserialize properly, or perhaps the processing/filtering logic fails in some unexpected way (I have no external dependencies so there should be no transient errors of that sort).

我正在考虑将所有处理/过滤代码包装在 try catch 中,如果引发异常,则路由到错误主题".然后我可以研究消息并修改它或适当地修复我的代码,然后重播它以掌握.如果我让任何异常传播,流似乎会被阻塞,并且不会再接收到更多消息.

I was considering wrapping all my processing/filtering code in a try catch and if an exception was raised then routing to an "error topic". Then I can study the message and modify it or fix my code as appropriate and then replay it on to master. If I let any exceptions propagate, the stream seems to get jammed and no more messages are picked up.

  • 这种方法是否被视为最佳实践?
  • 是否有一种方便的 Kafka 流方式来处理这个问题?我不认为有 DLQ 的概念...
  • 阻止 Kafka 干扰坏消息"的替代方法有哪些?
  • 有哪些可选的错误处理方法?

为了完整起见,这里是我的代码(伪代码):

For completeness here is my code (pseudo-ish):

class Document {
    // Fields
}

class AnalysedDocument {

    Document document;
    String rawValue;
    Exception exception;
    Analysis analysis;

    // All being well
    AnalysedDocument(Document document, Analysis analysis) {...}

    // Analysis failed
    AnalysedDocument(Document document, Exception exception) {...}

    // Deserialisation failed
    AnalysedDocument(String rawValue, Exception exception) {...}
}

KStreamBuilder builder = new KStreamBuilder();
KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder
    .stream(Serdes.String(), Serdes.String(), "master")
    .mapValues(new ValueMapper<String, AnalysedDocument>() {
         @Override
         public AnalysedDocument apply(String rawValue) {
             Document document;
             try {
                 // Deserialise
                 document = ...
             } catch (Exception e) {
                 return new AnalysedDocument(rawValue, exception);
             }
             try {
                 // Perform analysis
                 Analysis analysis = ...
                 return new AnalysedDocument(document, analysis);
             } catch (Exception e) {
                 return new AnalysedDocument(document, exception);
             }
         }
    });

// Branch based on whether analysis mapping failed to produce errorStream and successStream
errorStream.to(Serdes.String(), customPojoSerde(), "error");
successStream.to(Serdes.String(), customPojoSerde(), "analysed");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

非常感谢任何帮助.

推荐答案

目前,Kafka Streams 仅提供有限的错误处理功能.正在开展工作以简化这一点.目前,您的整体方法似乎是一个不错的方法.

Right now, Kafka Streams offers only limited error handling capabilities. There is work in progress to simplify this. For now, your overall approach seems to be a good way to go.

关于处理反/序列化错误的一条评论:手动处理这些错误,需要您手动"进行反/序列化.这意味着,您需要为 Streams 应用程序的输入/输出主题的键和值配置 ByteArraySerdes,并添加一个执行反序列化的 map()(即, KStream -> map() -> KStream -- 如果您还想捕获序列化异常,则相反).否则,你不能try-catch反序列化异常.

One comment about handling de/serialization errors: handling those error manually, requires you to do de/serialization "manually". This means, you need to configure ByteArraySerdes for key and value for you input/output topic of your Streams app and add a map() that does the de/serialization (ie, KStream<byte[],byte[]> -> map() -> KStream<keyType,valueType> -- or the other way round if you also want to catch serialization exceptions). Otherwise, you cannot try-catch deserialization exceptions.

使用您当前的方法,您仅"验证给定的字符串表示有效文档——但也可能是消息本身已损坏且无法转换为 String首先是源操作符.因此,您实际上并未在代码中涵盖反序列化异常.但是,如果您确定永远不会发生反序列化异常,那么您的方法也足够了.

With your current approach, you "only" validate that the given string represents a valid document -- but it could be the case, that the message itself is corrupted and cannot be converted into a String in the source operator in the first place. Thus, you don't actually cover deserialization exception with you code. However, if you are sure a deserialization exception can never happen, you approach would be sufficient, too.

更新

这个问题通过 KIP 解决-161 并将包含在下一个版本 1.0.0 中.它允许您通过参数 default.deserialization.exception.handler 注册回调.每次反序列化期间发生异常时都会调用处理程序,并允许您返回 DeserializationResponse(CONTINUE -> 删除记录继续前进,或 FAIL 这是默认值).

This issues is tackled via KIP-161 and will be included in the next release 1.0.0. It allows you to register an callback via parameter default.deserialization.exception.handler. The handler will be invoked every time a exception occurs during deserialization and allows you to return an DeserializationResponse (CONTINUE -> drop the record an move on, or FAIL that is the default).

更新 2

使用 KIP-210(将成为 Kafka 1.1 的一部分)也可以通过注册一个 ProductionExceptionHandler 在生产者端处理错误,类似于消费者部分 通过配置 default.production.exception.handler 可以返回 CONTINUE.

With KIP-210 (will be part of in Kafka 1.1) it's also possible to handle errors on the producer side, similar to the consumer part, by registering a ProductionExceptionHandler via config default.production.exception.handler that can return CONTINUE.

这篇关于使用 Kafka 的 Streams API 处理坏消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆