使用 Kafka Streams 进行外部处理 [英] External processing using Kafka Streams

查看:24
本文介绍了使用 Kafka Streams 进行外部处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

关于使用外部数据丰富消息有几个问题,建议几乎总是相同的:使用 Kafka Connect 摄取外部数据,然后使用状态存储加入记录.尽管它适用于大多数情况,但还有一些其他用例不适用,例如 IP 到位置和用户代理检测,仅举几例.

There are several questions regarding message enrichment using external data, and the recommendation is almost always the same: ingest external data using Kafka Connect and then join the records using state stores. Although it fits in most cases, there are several other use cases in which it does not, such as IP to location and user agent detection, to name a few.

使用基于 IP 的位置丰富消息通常需要 通过一系列 IP 进行查找,但目前没有提供这种功能的内置状态存储.对于用户代理分析,如果您依赖第三方服务,除了执行外部调用之外别无选择.

Enriching a message with an IP-based location usually requires a lookup by a range of IPs, but currently, there is no built-in state store that provides such capability. For user agent analysis, if you rely on a third-party service, you have no choices other than performing external calls.

我们花了一些时间思考这个问题,并提出了在支持范围查询的数据库(如 Postgres)之上实现自定义状态存储的想法.我们还可以在状态存储后抽象外部 HTTP 或 GRPC 服务,但我们不确定这是否正确.

We spend some time thinking about it, and we came up with an idea of implementing a custom state store on top of a database that supports range queries, like Postgres. We could also abstract an external HTTP or GRPC service behind a state store, but we're not sure if it is the right way.

从这个意义上说,当你在流处理过程中无法避免查询外部服务,但你仍然必须保证容错时,推荐的方法是什么?当状态存储检索数据时发生错误(例如,请求失败)时会发生什么?Kafka Streams 会重试处理消息吗?

In that sense, what is the recommended approach when you cannot avoid querying an external service during the stream processing, but you still must guarantee fault tolerance? What happens when an error occurs while the state store is retrieving data (a request fails, for instance)? Do Kafka Streams retry processing the message?

推荐答案

通常,KeyValueStore#range(fromKey, toKey) 由内置商店支持.因此,最好了解您尝试执行的范围查询是如何完成的?另请注意,在内部,所有内容都存储为 byte[] 数组,RocksDB(默认存储引擎)相应地对数据进行排序——因此,如果您开始推理字节布局,并将相应的前缀键"传入#range().

Generally, KeyValueStore#range(fromKey, toKey) is supported by build-in stores. Thus, it would be good to understand how the range queries you try to do are done? Also note, that internally, everything is stored as byte[] arrasy and RocksDB (default storage engine) sorts data accordingly -- hence, you can actually implement quite sophisticated range queries if you start to reason about the byte layout, and pass in corresponding "prefix keys" into #range().

如果你真的需要调用外部服务,你有两个"选择来避免丢失数据:如果外部调用失败,抛出异常并让 Kafka Streams 死掉.这显然不是一个真正的选择,但是,如果您从外部查找中吞下错误,您将跳过"输入消息并且它不会被处理.Kafka Streams 无法知道处理失败"(它不知道您的代码做了什么)并且不会重试",但会认为消息已完成(如果您将其过滤掉,则类似).

If you really need to call an external service, you have "two" options to not lose data: if an external calls fails, throw an exception and let the Kafka Streams die. This is obviously not a real option, however, if you swallow error from the external lookup you would "skip" the input message and it would be unprocessed. Kafka Streams cannot know that processing "failed" (it does not know what your code does) and will not "retry", but consider the message as completed (similar if you would filter it out).

因此,要使其工作,如果外部调用失败,您需要将用于触发查找的所有数据放入状态存储,然后重试(即,在存储中查找以查找未处理的数据和重试).当您处理下一条输入消息(或安排标点符号)以实现重试时,此重试可以是副任务".请注意,此机制会更改处理记录的顺序,以及对您的用例来说可能合适或不合适的方式.

Hence, to make it work, you would need to put all data you use to trigger the lookup into a state store if the external call fails, and retry later (ie, do a lookup into the store to find unprocessed data and retry). This retry can either be a "side task" when you process the next input message, of you schedule a punctuation, to implement the retry. Note, that this mechanism changes the order in which records are processed, what might or might not be ok for your use case.

这篇关于使用 Kafka Streams 进行外部处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆