使用Kafka Streams进行外部处理 [英] External processing using Kafka Streams

查看:78
本文介绍了使用Kafka Streams进行外部处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

关于使用外部数据进行消息丰富的几个问题,建议几乎总是相同的:

There are several questions regarding message enrichment using external data, and the recommendation is almost always the same: ingest external data using Kafka Connect and then join the records using state stores. Although it fits in most cases, there are several other use cases in which it does not, such as IP to location and user agent detection, to name a few.

使用基于IP的位置丰富邮件通常需要

Enriching a message with an IP-based location usually requires a lookup by a range of IPs, but currently, there is no built-in state store that provides such capability. For user agent analysis, if you rely on a third-party service, you have no choices other than performing external calls.

我们花了一些时间对此进行思考,然后我们想到了在支持范围查询的数据库(例如Postgres)上实现自定义状态存储的想法.我们也可以在状态存储后面抽象一个外部HTTP或GRPC服务,但是我们不确定这是否正确.

We spend some time thinking about it, and we came up with an idea of implementing a custom state store on top of a database that supports range queries, like Postgres. We could also abstract an external HTTP or GRPC service behind a state store, but we're not sure if it is the right way.

从这个意义上说,当您在流处理过程中无法避免查询外部服务但仍然必须保证容错能力时,推荐的方法是什么?当状态存储检索数据时发生错误(例如,请求失败),会发生什么情况? Kafka Streams是否重试处理该消息?

In that sense, what is the recommended approach when you cannot avoid querying an external service during the stream processing, but you still must guarantee fault tolerance? What happens when an error occurs while the state store is retrieving data (a request fails, for instance)? Do Kafka Streams retry processing the message?

推荐答案

通常,内置商店支持KeyValueStore#range(fromKey, toKey).因此,最好了解您尝试执行的范围查询是如何完成的?还要注意,在内部,所有内容都存储为byte[] arrasy,而RocksDB(默认存储引擎)会相应地对数据进行排序-因此,如果您开始推理字节布局并传递相应的内容,则实际上可以实现非常复杂的范围查询将前缀键"放入#range().

Generally, KeyValueStore#range(fromKey, toKey) is supported by build-in stores. Thus, it would be good to understand how the range queries you try to do are done? Also note, that internally, everything is stored as byte[] arrasy and RocksDB (default storage engine) sorts data accordingly -- hence, you can actually implement quite sophisticated range queries if you start to reason about the byte layout, and pass in corresponding "prefix keys" into #range().

如果您确实需要调用外部服务,则有两个"选项可以不丢失数据:如果外部调用失败,则引发异常并让Kafka Streams终止.这显然不是一个真正的选择,但是,如果您从外部查找中吞下了错误,则将跳过"输入消息,并且该消息将不被处理. Kafka Streams无法知道处理失败"(它不知道您的代码做什么)并且不会重试",而是将消息视为已完成(如果您将其过滤掉则类似).

If you really need to call an external service, you have "two" options to not lose data: if an external calls fails, throw an exception and let the Kafka Streams die. This is obviously not a real option, however, if you swallow error from the external lookup you would "skip" the input message and it would be unprocessed. Kafka Streams cannot know that processing "failed" (it does not know what your code does) and will not "retry", but consider the message as completed (similar if you would filter it out).

因此,要使其正常工作,如果外部调用失败,则需要将用于触发查找的所有数据放入状态存储中,然后再重试(即,对存储进行查找以查找未处理的数据,然后重试).当您处理下一条输入消息(计划标点符号)以实现重试时,此重试可以是辅助任务".请注意,此机制会更改处理记录的顺序,对于您的用例而言,可能会好坏.

Hence, to make it work, you would need to put all data you use to trigger the lookup into a state store if the external call fails, and retry later (ie, do a lookup into the store to find unprocessed data and retry). This retry can either be a "side task" when you process the next input message, of you schedule a punctuation, to implement the retry. Note, that this mechanism changes the order in which records are processed, what might or might not be ok for your use case.

这篇关于使用Kafka Streams进行外部处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆