使用 Kafka 的 Streams API 处理坏消息 [英] Handling bad messages using Kafka's Streams API
问题描述
我有一个基本的流处理流程,看起来像
I have a basic stream processing flow which looks like
master topic -> my processing in a mapper/filter -> output topics
我想知道处理坏消息"的最佳方式.这可能是诸如我无法正确反序列化的消息之类的事情,或者处理/过滤逻辑可能以某种意外方式失败(我没有外部依赖项,因此不应该存在此类瞬态错误).
and I am wondering about the best way to handle "bad messages". This could potentially be things like messages that I can't deserialize properly, or perhaps the processing/filtering logic fails in some unexpected way (I have no external dependencies so there should be no transient errors of that sort).
我正在考虑将所有处理/过滤代码包装在 try catch 中,如果引发异常,则路由到错误主题".然后我可以研究消息并修改它或适当地修复我的代码,然后重播它以掌握.如果我让任何异常传播,流似乎会被阻塞,并且不会再接收到更多消息.
I was considering wrapping all my processing/filtering code in a try catch and if an exception was raised then routing to an "error topic". Then I can study the message and modify it or fix my code as appropriate and then replay it on to master. If I let any exceptions propagate, the stream seems to get jammed and no more messages are picked up.
- 这种方法是否被视为最佳实践?
- 是否有一种方便的 Kafka 流方式来处理这个问题?我不认为有 DLQ 的概念...
- 阻止 Kafka 干扰坏消息"的替代方法有哪些?
- 有哪些可选的错误处理方法?
为了完整起见,这里是我的代码(伪代码):
For completeness here is my code (pseudo-ish):
class Document {
// Fields
}
class AnalysedDocument {
Document document;
String rawValue;
Exception exception;
Analysis analysis;
// All being well
AnalysedDocument(Document document, Analysis analysis) {...}
// Analysis failed
AnalysedDocument(Document document, Exception exception) {...}
// Deserialisation failed
AnalysedDocument(String rawValue, Exception exception) {...}
}
KStreamBuilder builder = new KStreamBuilder();
KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder
.stream(Serdes.String(), Serdes.String(), "master")
.mapValues(new ValueMapper<String, AnalysedDocument>() {
@Override
public AnalysedDocument apply(String rawValue) {
Document document;
try {
// Deserialise
document = ...
} catch (Exception e) {
return new AnalysedDocument(rawValue, exception);
}
try {
// Perform analysis
Analysis analysis = ...
return new AnalysedDocument(document, analysis);
} catch (Exception e) {
return new AnalysedDocument(document, exception);
}
}
});
// Branch based on whether analysis mapping failed to produce errorStream and successStream
errorStream.to(Serdes.String(), customPojoSerde(), "error");
successStream.to(Serdes.String(), customPojoSerde(), "analysed");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
非常感谢任何帮助.
推荐答案
目前,Kafka Streams 仅提供有限的错误处理功能.正在开展工作以简化这一点.目前,您的整体方法似乎是一个不错的方法.
Right now, Kafka Streams offers only limited error handling capabilities. There is work in progress to simplify this. For now, your overall approach seems to be a good way to go.
关于处理反/序列化错误的一条评论:手动处理这些错误,需要您手动"进行反/序列化.这意味着,您需要为 Streams 应用程序的输入/输出主题的键和值配置 ByteArraySerde
s,并添加一个执行反序列化的 map()
(即, KStream
-- 如果您还想捕获序列化异常,则相反).否则,你不能try-catch
反序列化异常.
One comment about handling de/serialization errors: handling those error manually, requires you to do de/serialization "manually". This means, you need to configure ByteArraySerde
s for key and value for you input/output topic of your Streams app and add a map()
that does the de/serialization (ie, KStream<byte[],byte[]> -> map() -> KStream<keyType,valueType>
-- or the other way round if you also want to catch serialization exceptions). Otherwise, you cannot try-catch
deserialization exceptions.
使用您当前的方法,您仅"验证给定的字符串表示有效文档——但也可能是消息本身已损坏且无法转换为 String
首先是源操作符.因此,您实际上并未在代码中涵盖反序列化异常.但是,如果您确定永远不会发生反序列化异常,那么您的方法也足够了.
With your current approach, you "only" validate that the given string represents a valid document -- but it could be the case, that the message itself is corrupted and cannot be converted into a String
in the source operator in the first place. Thus, you don't actually cover deserialization exception with you code. However, if you are sure a deserialization exception can never happen, you approach would be sufficient, too.
更新
这个问题通过 KIP 解决-161 并将包含在下一个版本 1.0.0 中.它允许您通过参数 default.deserialization.exception.handler
注册回调.每次反序列化期间发生异常时都会调用处理程序,并允许您返回 DeserializationResponse
(CONTINUE
-> 删除记录继续前进,或 FAIL
这是默认值).
This issues is tackled via KIP-161 and will be included in the next release 1.0.0. It allows you to register an callback via parameter default.deserialization.exception.handler
. The handler will be invoked every time a exception occurs during deserialization and allows you to return an DeserializationResponse
(CONTINUE
-> drop the record an move on, or FAIL
that is the default).
更新 2
使用 KIP-210(将成为 Kafka 1.1 的一部分)也可以通过注册一个 ProductionExceptionHandler 在生产者端处理错误,类似于消费者部分
通过配置 default.production.exception.handler
可以返回 CONTINUE
.
With KIP-210 (will be part of in Kafka 1.1) it's also possible to handle errors on the producer side, similar to the consumer part, by registering a ProductionExceptionHandler
via config default.production.exception.handler
that can return CONTINUE
.
这篇关于使用 Kafka 的 Streams API 处理坏消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!