apache-kafka-streams相关内容

KTable 无法从物化视图中获取数据

我在 Spring Boot 中使用 Kafka Streams.在我的用例中,当我收到客户事件时,我需要将其存储在 customer-store 物化视图中,当我收到订单事件时,我需要加入客户并订购,然后将结果存储在 customer 中-order 物化视图. StoreBuilder customerStateStore = Stores.keyValueStoreBuilder(Stor ..
发布时间:2021-11-12 03:40:16 其他开发

Kafka Stream 在重新平衡时重新处理旧消息

我有一个 Kafka Streams 应用程序,它从几个主题读取数据,连接数据并将其写入另一个主题. 这是我的Kafka集群的配置: 5 个 Kafka 经纪人Kafka 主题 - 15 个分区和复制因子 3. 我的 Kafka Streams 应用程序与我的 Kafka 代理运行在同一台机器上. 每小时消耗/产生几百万条记录.每当我关闭代理时,应用程序都会进入重新平衡状态,并且在 ..

Kafka 流:从应用程序的每个实例中的所有分区读取

使用 KTable 时,当实例/消费者数量等于分区数量时,Kafka 流不允许实例从特定主题的多个分区中读取.我尝试使用 GlobalKTable 来实现这一点,这样做的问题是数据将被覆盖,并且无法对其应用聚合. 假设我有一个名为“data_in"的主题,有 3 个分区(P1、P2、P3).当我运行 Kafka 流应用程序的 3 个实例(I1、I2、I3)时,我希望每个实例都从“data_i ..
发布时间:2021-11-12 03:40:07 Java开发

kafka KStream - 需要 n 秒计数的拓扑

我有一个 JSON 对象流,我键入了几个值的散列.我希望在 n 秒(10?60?)间隔内按键计数,并使用这些值进行一些模式分析. 我的拓扑:K->aggregateByKey(n seconds)->process() 在 process - init() 步骤中,我调用了 ProcessorContent.schedule(60 * 1000L) 以希望获得 .punctuate() ..
发布时间:2021-11-12 03:40:01 Java开发

如何在窗口流媒体etl中显示中间结果?

我们目前在事件存储中实时聚合数据.这个想法是可视化多个时间范围(每月、每周、每天、每小时)和多个名义键的交易数据.我们经常有迟到的数据,所以我们需要考虑到这一点.此外,要求显示“运行"结果,即当前窗口完成之前的值. 目前我们正在使用 Kafka 和 Apache Storm(特别是 Trident,即微批处理)来做到这一点.我们的架构大致如下: (为我丑陋的照片道歉).我们使用 Mon ..

Kafka Streams 表转换

我在 SQL Server 中有一个表,我想将其流式传输到 Kafka 主题,结构如下: (用户 ID,报告 ID) 此表将不断更改(添加、插入、不更新记录) 我想把它转换成这种结构并放到 Elasticsearch 中: {“用户ID":1,“报告":[1, 2, 3, 4, 5, 6]} 到目前为止我看到的例子是日志或点击流,但在我的情况下不起作用. 这种用例可能吗?我总是 ..

状态存储可能已迁移到另一个实例

当我尝试从流访问状态存储时,出现以下错误. 状态存储 count-store 可能已迁移到另一个实例. 当我尝试从商店访问 ReadOnlyKeyValueStore 时,在迁移到其他服务器时收到错误消息.但我只有一个经纪人正在运行 /****/包 com.ms.kafka.com.ms.stream;导入 java.util.Properties;导入 java.util.stre ..
发布时间:2021-11-12 03:39:47 其他开发

应该由 KTable 发出的事件

我正在尝试测试作为最后一个节点具有 KTable 的拓扑.我的测试使用的是成熟的 Kafka 集群(通过 Confluent 的 Docker 镜像),所以我不使用 TopologyTestDriver. 我的拓扑有键值类型的输入 String ->Customer 和 String 的输出 ->客户映射.serdes、模式和与模式注册表的集成都按预期工作. 我正在使用 Scala、K ..
发布时间:2021-11-12 03:39:41 其他开发

为什么kafka流会重新处理broker重启后产生的消息

我有一个单节点 kafka 代理和简单的流应用程序.我创建了 2 个主题(topic1 和 topic2). 产生于 topic1 - 处理过的消息 - 写入 topic2 注意:对于产生的每条消息,只有一条消息写入目标主题 我生成了一条消息.写入 topic2 后,我停止了 kafka 代理.一段时间后,我重新启动了代理并在 topic1 上生成了另一条消息.现在流应用程序处理 ..
发布时间:2021-11-12 03:39:38 Java开发

Kafka Streams - 如何扩展 Kafka 存储生成的变更日志主题

我有多个冗余应用实例,它们想要使用一个主题的所有事件并独立存储它们以进行磁盘查找(通过 RocksDB). 为了论证起见,让我们假设这些多余的消费者正在为无状态的 http 请求提供服务;所以负载不是使用 kafka 共享的,而是使用 kafka 将数据从生产者复制到每个实例本地存储中. 查看生成的主题时,每个消费应用创建了 3 个额外的主题: {topicname}STATE- ..
发布时间:2021-11-12 03:39:35 其他开发

Kafka Streams - 低级处理器 API - RocksDB TimeToLive(TTL)

我正在尝试使用低级处理器 API.我正在使用处理器 API 对传入记录进行数据聚合,并将聚合记录写入 RocksDB. 但是,我想保留在rocksdb中添加的记录仅在24小时内处于活动状态.24 小时后应删除该记录.这可以通过更改 ttl 设置来完成.但是,没有太多文档可以帮助我解决这个问题. 如何更改 ttl 值?我应该使用什么 java api 将 ttl 时间设置为 24 小时, ..
发布时间:2021-11-12 03:39:33 其他开发

我可以在与 Kafka Broker 相同的机器上运行 Kafka Streams 应用程序吗?

我有一个 Kafka Streams 应用程序,它从几个主题中获取数据并将数据连接起来并将其放入另一个主题中. Kafka 配置: 5 个 kafka 代理Kafka 主题 - 15 个分区和 3 个复制因子. 注意:我在运行 Kafka Broker 的同一台机器上运行 Kafka Streams 应用程序. 每小时消耗/产生数百万条记录.每当我关闭任何 kafka 经纪人时,它 ..

Kafka KStream 应用程序 - 临时文件清理

似乎我的基于 KStream 的应用程序堆积了许多 GB 的文件(.sst、Log.old. 等). 这些会自行清理还是我需要密切关注?要设置一些参数来剔除它们? 解决方案 关于这些本地/临时文件:其中一些文件是应用程序状态,它们应该占消耗的大部分空间.您的应用程序可能会“堆积"许多 GB 的文件,这仅仅是因为您的应用程序实际上正在管理大量状态.如果删除这些文件,可以通过从 Kafk ..
发布时间:2021-11-12 03:39:27 Java开发

Kafka Streams:RocksDB TTL

我知道默认 TTL 设置为无穷大(非正).但是,如果我们需要在存储中保留最多 2 天的数据,我们是否可以使用 RocksDBConfigSetter 接口实现进行覆盖,即 options.setWalTtlSeconds(172800)?或者它会与 Kafka 流内部发生冲突吗? 参考:https://docs.confluent.io/current/streams/developer-g ..
发布时间:2021-11-12 03:39:16 其他开发