apache-kafka-streams相关内容

如何使用 ksql 在 Kafka 的时间窗口内在聚合之上执行聚合

我有一堆防火墙数据.我想: A) 将每个 IP 每小时的字节数相加,然后 B) 计算该小时内所有 IP 的最小和最大总和 我已经能够在 Kafka 中做 A,但是,我不知道如何做 B.我一直在研究文档,感觉自己快要接近了,但我似乎总是只找到了一部分解决方案. 我的 firewall_stream 运行良好. client.create_stream(table_name= ..
发布时间:2021-11-12 03:42:14 其他开发

Kafka-streams 延迟启动消费者正常关闭的重新平衡

这是对我之前发送的有关 Kafka Streams 中的高延迟问题的跟进;(Kafka Streams 重新平衡高吞吐量 kafka 上的延迟峰值- 流服务). 提醒一下,我们的无状态服务有非常严格的延迟要求,而且我们面临着延迟过高的问题(有些消息在生成后消耗超过 10 秒),特别是当消费者优雅地离开组时. 经过进一步调查,我们发现至少对于小型消费群体而言,重新平衡的时间不到 500 ..
发布时间:2021-11-12 03:42:11 Java开发

Kafka Stream:KTable 物化

如何确定主题的 KTable 实现何时完成? 例如假设 KTable 有几百万行.伪代码如下: KTablekt = kgroupedStream.groupByKey(..).reduce(..);//假设这会产生几百万行 在某个时间点,我想安排一个线程来调用以下内容,写入主题:kt.toStream().to("output_topic_name"); 我想确保所有数据都作为上 ..
发布时间:2021-11-12 03:42:06 其他开发

如何使用相同的 APPLICATION_ID_CONFIG 运行两个或多个拓扑?

我想在同一个实例上运行 2 个拓扑.1个拓扑涉及状态存储,其他涉及全局存储.我如何成功地做到这一点? 我创建了 1 个包含 3 个分区的主题,然后在 1 个拓扑中添加了一个状态存储,在第二个拓扑中添加了一个全局存储. 拓扑 1: public void createTopology() {拓扑拓扑 = new Topology();topology.addSource("sourc ..
发布时间:2021-11-12 03:41:51 其他开发

如何在kafka消费者群体的情况下引入重新平衡延迟?

我想给我的消费者一些时间来重新启动,以免发生不必要的重新平衡.我怎样才能做到这一点?在关闭的情况下,我希望复制出现,一段时间后如果消费者没有备份,则应该发生重新平衡,否则不会发生. 解决方案 您可以调整名为 group.initial.rebalance.delay.ms 的代理级别配置. 群协调者等待更多消费者的时间在执行第一次重新平衡之前加入一个新组.更长的延迟意味着可能更少的重 ..
发布时间:2021-11-12 03:41:48 其他开发

根据部分数据属性更新KTable

我正在尝试使用对象的部分数据更新 KTable.例如.用户对象是{"id":1, "name":"Joe", "age":28}对象被流式传输到一个主题中,并按密钥分组到 KTable 中.现在用户对象部分更新如下 {"id":1, "age":33} 并流式传输到表中.但更新后的表看起来如下 {"id":1, "name":null, "age":28}.预期输出为 {"id":1, "name ..

如何在两个 Kafka Streams 之间使用持久化的 StateStore

我在尝试通过 Kafka Streams 实现以下目标时遇到了一些麻烦: 在应用程序启动时,(压缩的)主题 alpha 被加载到键值 StateStore 映射中 Kafka Stream 从另一个主题中消费,使用 (.get) 上面的映射,最后在主题中生成一条新记录 alpha 结果是内存映射应该与基础主题对齐,即使流媒体重新启动也是如此. 我的方法如下: val builde ..
发布时间:2021-11-12 03:41:42 其他开发

Kafka 使用了哪些内部主题?

我们使用 kafka 流 api 进行聚合,其中我们也使用 group by.我们还使用状态存储来保存输入主题数据. 我注意到的是 Kafka内部创建了3种topic Changelog-- 重新分区-- - 我无法理解的是 为什么当我拥有 - 中的所有数据时,它会创建变更日志主题重新分区主题是否包含分组后的数据. 而且我看到 Changelog 和 topi ..
发布时间:2021-11-12 03:41:39 其他开发

如何在spring boot中配置两个Kafka StreamsBuilderFactoryBean实例

使用 spring-boot-2.1.3、spring-kafka-2.2.4,我想有两个流配置(例如有不同的 application.id,或连接到不同的集群等).所以我几乎根据文档定义了第一个流配置,然后添加了第二个,具有不同的名称,以及第二个 StreamsBuilderFactoryBean(也具有不同的名称): @Bean(name = KafkaStreamsDefaultConfi ..
发布时间:2021-11-12 03:41:36 其他开发

Kafka Stream:消费者提交频率

至少一次保证,我知道在失败的情况下有可能重复.然而, 1) Kafka Stream 库执行提交的频率如何? 2) 除了上述之外,用户是否还需要考虑提交? 3)是否有关于提交频率的最佳实践? 解决方案 Kafka Streams 定期提交,可以通过参数 commit.interval.ms 进行配置(默认为 30 秒;如果恰好-启用处理后,默认为 100 毫秒). 通常,用户不 ..
发布时间:2021-11-12 03:41:30 其他开发

Kafka Streams 重新平衡高吞吐量 kafka-streams 服务的延迟峰值

我们开始使用 Kafka 流,我们的服务是一个非常简单的无状态消费者. 我们对延迟有严格的要求,当消费者群体重新平衡时,我们面临着过高的延迟问题.在我们的场景中,重新平衡会相对频繁地发生:滚动更新代码、扩展/缩减服务、容器被集群调度程序改组、容器死亡、硬件故障. 我们所做的第一个测试是让一个包含 4 个消费者的小型消费者组处理少量消息(1K/秒)并杀死其中一个;集群管理器(目前是 AW ..
发布时间:2021-11-12 03:41:27 Java开发

Kafka Streams 本地状态存储

我有一个简单的流应用程序,将一个主题作为输入流并将 KeyValues 转换为另一个主题,例如: StoreBuilder>建设者 =Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(CategoryTransformer.STORE_NAME),Serdes.Long(), CATEGORY_JSON_SERDE);streamB ..
发布时间:2021-11-12 03:41:24 Java开发

Kafka Streams 内部数据管理

在我的公司,我们广泛使用 Kafka,但出于容错的原因,我们一直使用关系数据库来存储多个中间转换和聚合的结果.现在我们正在探索 Kafka Streams 作为一种更自然的方式来做到这一点.通常,我们的需求非常简单——其中一种情况是 监听、、、... 的输入队列对于每条记录,执行一些高延迟操作(调用远程服务) 如果在处理 时,并且 、 都已生成,则我应该处理 V3,因为 V2 已经过时了 ..
发布时间:2021-11-12 03:41:21 其他开发