使用Kafka的Streams API处理错误消息 [英] Handling bad messages using Kafka's Streams API

查看:176
本文介绍了使用Kafka的Streams API处理错误消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个基本的流处理流程,看起来像

I have a basic stream processing flow which looks like

master topic -> my processing in a mapper/filter -> output topics

我想知道处理坏消息"的最佳方法.可能是某些消息,例如我无法正确反序列化的消息,或者处理/过滤逻辑以某种意外的方式失败(我没有外部依赖关系,因此应该没有此类的瞬时错误).

and I am wondering about the best way to handle "bad messages". This could potentially be things like messages that I can't deserialize properly, or perhaps the processing/filtering logic fails in some unexpected way (I have no external dependencies so there should be no transient errors of that sort).

我正在考虑将所有处理/过滤代码包装在try catch中,如果引发异常,则路由到错误主题".然后,我可以研究消息并对其进行修改,或者适当地修复我的代码,然后将其重放以掌握.如果我让任何异常传播,该流似乎被阻塞并且没有其他消息被接收.

I was considering wrapping all my processing/filtering code in a try catch and if an exception was raised then routing to an "error topic". Then I can study the message and modify it or fix my code as appropriate and then replay it on to master. If I let any exceptions propagate, the stream seems to get jammed and no more messages are picked up.

  • 这种方法是否被认为是最佳做法?
  • 是否有方便的Kafka流方法来处理此问题?我认为没有DLQ的概念...
  • 阻止卡夫卡干扰坏消息"的其他方法是什么?
  • 有哪些替代的错误处理方法?

为完整起见,这是我的代码(伪ish):

For completeness here is my code (pseudo-ish):

class Document {
    // Fields
}

class AnalysedDocument {

    Document document;
    String rawValue;
    Exception exception;
    Analysis analysis;

    // All being well
    AnalysedDocument(Document document, Analysis analysis) {...}

    // Analysis failed
    AnalysedDocument(Document document, Exception exception) {...}

    // Deserialisation failed
    AnalysedDocument(String rawValue, Exception exception) {...}
}

KStreamBuilder builder = new KStreamBuilder();
KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder
    .stream(Serdes.String(), Serdes.String(), "master")
    .mapValues(new ValueMapper<String, AnalysedDocument>() {
         @Override
         public AnalysedDocument apply(String rawValue) {
             Document document;
             try {
                 // Deserialise
                 document = ...
             } catch (Exception e) {
                 return new AnalysedDocument(rawValue, exception);
             }
             try {
                 // Perform analysis
                 Analysis analysis = ...
                 return new AnalysedDocument(document, analysis);
             } catch (Exception e) {
                 return new AnalysedDocument(document, exception);
             }
         }
    });

// Branch based on whether analysis mapping failed to produce errorStream and successStream
errorStream.to(Serdes.String(), customPojoSerde(), "error");
successStream.to(Serdes.String(), customPojoSerde(), "analysed");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

任何帮助都将不胜感激.

Any help greatly appreciated.

推荐答案

现在,Kafka Streams仅提供有限的错误处理功能.正在进行简化工作.目前,您的整体方法似乎是一个不错的选择.

Right now, Kafka Streams offers only limited error handling capabilities. There is work in progress to simplify this. For now, your overall approach seems to be a good way to go.

关于处理反序列化错误的一条评论:手动处理这些错误,要求您手动"进行反序列化.这意味着,您需要为Streams应用程序的输入/输出主题配置ByteArraySerde的键和值,并添加一个进行反序列化的map()(即KStream<byte[],byte[]> -> map() -> KStream<keyType,valueType>,反之亦然)如果您还想捕获序列化异常).否则,您不能try-catch反序列化异常.

One comment about handling de/serialization errors: handling those error manually, requires you to do de/serialization "manually". This means, you need to configure ByteArraySerdes for key and value for you input/output topic of your Streams app and add a map() that does the de/serialization (ie, KStream<byte[],byte[]> -> map() -> KStream<keyType,valueType> -- or the other way round if you also want to catch serialization exceptions). Otherwise, you cannot try-catch deserialization exceptions.

使用当前的方法,您仅"验证给定的字符串表示有效的文档-但可能是这样,消息本身已损坏并且无法在源运算符中转换为String第一名.因此,您实际上并未在代码中涵盖反序列化异常.但是,如果您确定永远不会发生反序列化异常,那么您的方法也就足够了.

With your current approach, you "only" validate that the given string represents a valid document -- but it could be the case, that the message itself is corrupted and cannot be converted into a String in the source operator in the first place. Thus, you don't actually cover deserialization exception with you code. However, if you are sure a deserialization exception can never happen, you approach would be sufficient, too.

更新

此问题通过 KIP解决-161 ,并将包含在下一版本1.0.0中.它允许您通过参数default.deserialization.exception.handler注册回调.在反序列化期间每次发生异常时,都会调用该处理程序,并允许您返回DeserializationResponse(CONTINUE->删除继续记录的记录,或者是默认的FAIL).

This issues is tackled via KIP-161 and will be included in the next release 1.0.0. It allows you to register an callback via parameter default.deserialization.exception.handler. The handler will be invoked every time a exception occurs during deserialization and allows you to return an DeserializationResponse (CONTINUE -> drop the record an move on, or FAIL that is the default).

更新2

使用 KIP-210 (将是Kafka 1.1的一部分),还可以通过在生产者端通过注册ProductionExceptionHandler来处理类似于消费方的错误.可以返回CONTINUE的配置default.production.exception.handler.

With KIP-210 (will be part of in Kafka 1.1) it's also possible to handle errors on the producer side, similar to the consumer part, by registering a ProductionExceptionHandler via config default.production.exception.handler that can return CONTINUE.

这篇关于使用Kafka的Streams API处理错误消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆