Kafka Streams 内部数据管理 [英] Kafka Streams Internal Data Management

查看:27
本文介绍了Kafka Streams 内部数据管理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的公司,我们广泛使用 Kafka,但出于容错的原因,我们一直使用关系数据库来存储多个中间转换和聚合的结果.现在我们正在探索 Kafka Streams 作为一种更自然的方式来做到这一点.通常,我们的需求非常简单——其中一种情况是

In my company, we are using Kafka extensively, but we have been using relational database to store results of several intermediary transformations and aggregations for fault tolerance reasons. Now we are exploring Kafka Streams as a more natural way to do this. Often, our needs are quite simple - one such case is

  • 监听...
  • 的输入队列
  • 对于每条记录,执行一些高延迟操作(调用远程服务)
  • 如果在处理 时,并且 都已生成,则我应该处理 V3,因为 V2 已经过时了
  • Listen to an input queue of <K1,V1>, <K2,V2>, <K1,V2>, <K1,V3>...
  • For each record, perform some high latency operation (call a remote service)
  • If by the time <K1,V1> is processed, and both <K1,V2>, <K1,V3> have been produced, then I should process V3 as V2 has already become stale

为了实现这一点,我将主题作为 KTable 阅读.代码如下

In-order to achieve this, I am reading the topic as a KTable. Code looks like below

KStreamBuilder builder = new KStreamBuilder();
KTable<String, String> kTable = builder.table("input-topic");
kTable.toStream().foreach((K,V) -> client.post(V));
return builder;

这按预期工作,但我不清楚 Kafka 如何自动实现这一点.我假设 Kafka 创建了内部主题来实现这一点,但我没有看到创建任何内部主题.Javadoc对于方法似乎证实了这一观察结果.但后来我遇到了这个官方页面这似乎表明 Kafka 使用了一个单独的数据存储区,即 RocksDB 以及一个变更日志主题.

This works as expected, but it is not clear to me how Kafka automagically achieves this. I was assuming that Kafka creates internal topics to achieve this, but I do not see any internal topics created. Javadoc for the method seem to confirm this observation. But then I came across this official page which seem to suggest that Kafka uses a separate datastore aka RocksDB along with a changelog topic.

现在我很困惑,因为在什么情况下创建了更改日志主题.我的问题是

Now I am confused, as under which circumstances are changelog topic created. My questions are

  1. 如果状态存储的默认行为是官方页面建议的容错,那么状态存储在哪里?在 RocksDB 中?在变更日志主题或两者中?
  2. 在生产中依赖 RocksDB 有什么影响?(已编辑)
  1. 据我所知,对rocksdb 的依赖是透明的(只是一个jar 文件),而rocksdb 将数据存储在本地文件系统中.但这也意味着在我们的情况下,该应用程序将在应用程序运行的存储上维护分片数据的副本.当我们用 KTable 替换远程数据库时,它具有存储影响,这就是我的观点.
  2. Kafka 版本是否会照顾 RocksDB 将继续在各种平台上工作?(因为它似乎依赖于平台而不是用 Java 编写的)

  • 压缩输入主题日志有意义吗?
  • 我使用的是 0.11.0 版

    I am using v. 0.11.0

    推荐答案

    1. Kafka Streams 在本地存储状态.默认使用 RocksDB.然而,本地状态是短暂的.为了容错,对存储的所有更新也写入更改日志主题.这允许在出现故障或缩小/缩小的情况下重建和/或迁移存储.对于您的特殊情况,不会创建更改日志主题,因为 KTable 不是聚合的结果,而是直接从主题填充 - 这只是一种优化.由于更改日志主题将包含与输入主题完全相同的数据,因此 Kafka Streams 避免了数据重复,并在出现错误情况时使用输入主题作为更改日志主题.

    1. Kafka Streams stores state locally. By default using RocksDB. However, local state is ephemeral. For fault-tolerance, all updates to a store are also written into a changelog topic. This allows to rebuild and/or migrate the store in case of failure or scaling in/out. For your special case, no changelog topic is created because the KTable is not the result of an aggregation but populated directly from a topic -- this is an optimization only. Because the changelog topic would contain the exact same data as the input topic, Kafka Streams avoids data duplication and uses the input topic as changelog topic in cause if an error scenario.

    不确定你问的这个问题到底是什么意思.请注意,RocksDB 被视为临时存储.出于各种原因默认使用它,如下所述:为什么 Apache Kafka Streams 使用 RocksDB 以及如何更改它?(例如,它允许保存大于主内存的状态,因为它可以溢出到磁盘).您可以将 RocksDB 替换为任何其他存储.Kafka Streams 还附带一个内存存储.(编辑)

    Not sure exactly what you mean by this question. Note, that RocksDB is considered an ephemeral store. It's used by default for various reasons as discussed here: Why Apache Kafka Streams uses RocksDB and if how is it possible to change it? (for example it allows to hold state larger than main-memory as it can spill to disk). You can replace RocksDB with any other store. Kafka Streams also ships with an in-memory store. (Edit)

    1. 没错.您需要相应地配置您的应用程序,以便能够存储整体状态的本地分片.有一个大小指南:https://docs.confluent.io/current/streams/sizing.html

    RocksDB 是用 C++ 编写的,并通过 JNI 绑定包含在内.在 Windows 上存在一些已知问题,因为 RocksDB 没有为所有版本的 Windows 提供预编译的二进制文件.只要您停留在基于 Linux 的平台上,它就应该可以工作.Kafka 社区为 RocksDB 运行升级测试以确保其兼容.

    RocksDB is written in C++ and included via JNI binding. There are some known issues on Windows as RocksDB does not provide pre-compiled binaries for all versions of Windows. As long as you stay on Linux based platform, it should work. Kafka community runs upgrade tests for RocksDB to make sure it's compatible.

  • 是的.Kafka Streams 实际上假设 table() 操作的输入主题是日志压缩的.否则,在发生故障时存在数据丢失的风险.(编辑)

  • Yes. Kafka Streams actually assumes that the input topic for a table() operation is log-compacted. Otherwise, there is the risk of data loss in case of failure. (Edit)

    1. 如果您启用日志压缩,保留时间设置将被忽略.因此,是的,最新的更新将永远保持(或直到写入 value=null 的墓碑消息).请注意,当在代理端执行压缩时,旧数据会被垃圾收集,因此在还原时,每个键只读取新版本——旧版本在压缩过程中被删除.如果一段时间后您对某些数据不感兴趣,您需要在源主题中写入一个墓碑以使其工作.
    1. If you enable log-compaction, retention time setting is ignored. Thus, yes, the latest update will be maintained forever (or until a tombstone message with value=null is written). Note, that when compaction is execute on the broker side, old data is garbage collected and thus, on restore, only the new version per key are read -- old versions got removed during compaction process. If you are not interested in some data after some period of time, you would need to write a tombstone into the source topic to make it work.

  • 这篇关于Kafka Streams 内部数据管理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆