apache-kafka-streams相关内容

未能刷新状态存储

我正在尝试在 Kafka Streams 中创建一个 leftJoin,它可以正常工作大约 10 条记录,然后它崩溃了,由 NullPointerException 引起的异常使用这样的代码: 私有静态 KafkaStreams getKafkaStreams() {StreamsConfig StreamsConfig = new StreamsConfig(getProperties()); ..
发布时间:2021-11-12 03:41:18 其他开发

Kafka Streams - 跳跃窗口 - 去重密钥

我正在每 5 分钟推进一次的 4 小时窗口上进行跳跃窗口聚合.由于跳跃窗口重叠,我得到了具有不同聚合值的重复键. TimeWindows.of(240 * 60 * 1000L).advanceBy(5 * 60* 1000L) 如何消除具有重复数据的重复键或仅选择包含最新值的键. 解决方案 2021 年 5 月更新: Kafka Streams API 支持 “final"现在窗口结 ..
发布时间:2021-11-12 03:41:07 其他开发

仅在流上收到新事件时才抑制触发事件

我使用的是 Kafka 流 2.2.1. 我使用抑制来阻止事件直到窗口关闭.我正在使用事件时间语义.但是,只有在流上有新消息可用时才会触发触发消息. 提取以下代码对问题进行采样: KStream[] 分支 = 是.branch((key, msg) -> "a".equalsIgnoreCase(msg.split(",")[1]),(键,味精)->"b".equalsIgnore ..
发布时间:2021-11-12 03:41:01 其他开发

使用 Spring Cloud Stream kafka 动态更改实例索引

类似于:更改 spring-cloud-stream 实例索引/运行时计数 我在微服务器架构中对批处理的启动做了一个 poc,我正在将 Spring 批处理与 Spring Cloud Stream Kafka 一起使用.我正在寻找一种方法来动态创建消费者(处理器)应用程序的多个实例.我看到可以用 定义多个实例 spring.cloud.stream.instanceCount=nspr ..

从主题中的特定分区流式传输(Kafka Streams)

据我阅读 Kafka Streams 文档后的理解,不可能将它用于仅从给定主题的一个分区流式传输数据,人们总是必须完整阅读. 正确吗? 如果是,未来是否有计划为 API 提供这样的选项? 解决方案 不,你不能这样做,因为内部消费者订阅主题,加入通过 application-id 指定的消费者组,所以分区是自动分配的.顺便说一句,你为什么要这样做?如果不重新平衡,您将失去 Kaf ..
发布时间:2021-11-12 03:40:53 其他开发

Kafka 流聚合是否有任何排序保证?

我的 Kafka 主题包含由 deviceId 键控的状态.我想使用 KStreamBuilder.stream().groupByKey().aggregate(...) 只在 TimeWindow 中保留状态的最新值.我猜想,只要主题按键分区,聚合函数总是可以以这种方式返回最新值: (key, value, old_value) ->价值 这是我可以从 Kafka Streams ..
发布时间:2021-11-12 03:40:50 其他开发

Kafka-streams:为什么所有分区都分配给消费者组中的同一个消费者?

背景 多台机器生成事件.这些事件被发送到我们的 Kafka 集群,其中每台机器都有自己的主题 (app.machine-events.machine-name).因为在每台机器的基础上顺序很重要,而且分区大小现在不是问题,所以所有主题都由一个分区组成.因此,N 个主题目前也意味着 N 个分区. 消费/处理应用程序使用 kafka-streams,我们给了 StreamsConfig.A ..
发布时间:2021-11-12 03:40:39 其他开发

Kafka Streams 持久存储错误:状态存储,可能已迁移到另一个实例

我在 Spring Boot 中使用 Kafka Streams.在我的用例中,当我从其他微服务接收客户事件时,我需要存储在 customer 物化视图中,当我收到 order 事件时,我需要加入客户并订购然后存储在客户订单物化视图中.为了实现这一点,我创建了持久键值存储 customer-store 和 当有新事件发生时更新它. StoreBuilder customerStateStore ..
发布时间:2021-11-12 03:40:33 其他开发

Kafka Streams - 共享变更日志主题

这是一个后续问题:Kafka Streams- 如何扩展 Kafka 存储生成的变更日志主题 让我们假设流消费者需要在存储数据之前进行一些转换(通过 v->k 而不是 k->v 索引). 最后,目标是每个消费者都需要将完整的转换记录集 (v->k) 存储在 RocksDB 中.我知道上游的另一个处理器可以根据 k->v 来处理生成 v->k 并且最终消费者可以简单地从全局表中具体化新主 ..
发布时间:2021-11-12 03:40:30 其他开发

KafkaStreams 同一应用程序中的多个流

我正在尝试根据 KafkaStreams 的惯例和合理性做出实用的设计决策. 假设我有两个不同的事件要放入 KTable 中.我有一个生产者将这些消息发送到正在侦听该主题的 KStream. 据我所知,我不能对使用 KafkaStreams 的消息使用条件转发,所以如果流订阅了许多主题(例如,上述每个消息一个),我只能调用stream.to 在单个接收器主题上 - 否则,我将不得不在流 ..
发布时间:2021-11-12 03:40:27 其他开发