apache-kafka-streams相关内容
我在 Spring Boot 中使用 Kafka Streams.在我的用例中,当我收到客户事件时,我需要将其存储在 customer-store 物化视图中,当我收到订单事件时,我需要加入客户并订购,然后将结果存储在 customer 中-order 物化视图. StoreBuilder customerStateStore = Stores.keyValueStoreBuilder(Stor
..
我有一个 Kafka Streams 应用程序,它从几个主题读取数据,连接数据并将其写入另一个主题. 这是我的Kafka集群的配置: 5 个 Kafka 经纪人Kafka 主题 - 15 个分区和复制因子 3. 我的 Kafka Streams 应用程序与我的 Kafka 代理运行在同一台机器上. 每小时消耗/产生几百万条记录.每当我关闭代理时,应用程序都会进入重新平衡状态,并且在
..
我正在尝试在此处构建 Materialized.as DSL 代码:https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/state/Stores.html 但我收到错误 不兼容的类型:org.apache.kafka.common.serialization.Serde无法转换为 org.apache.kafka.com
..
使用 KTable 时,当实例/消费者数量等于分区数量时,Kafka 流不允许实例从特定主题的多个分区中读取.我尝试使用 GlobalKTable 来实现这一点,这样做的问题是数据将被覆盖,并且无法对其应用聚合. 假设我有一个名为“data_in"的主题,有 3 个分区(P1、P2、P3).当我运行 Kafka 流应用程序的 3 个实例(I1、I2、I3)时,我希望每个实例都从“data_i
..
我有一个 JSON 对象流,我键入了几个值的散列.我希望在 n 秒(10?60?)间隔内按键计数,并使用这些值进行一些模式分析. 我的拓扑:K->aggregateByKey(n seconds)->process() 在 process - init() 步骤中,我调用了 ProcessorContent.schedule(60 * 1000L) 以希望获得 .punctuate()
..
我们目前在事件存储中实时聚合数据.这个想法是可视化多个时间范围(每月、每周、每天、每小时)和多个名义键的交易数据.我们经常有迟到的数据,所以我们需要考虑到这一点.此外,要求显示“运行"结果,即当前窗口完成之前的值. 目前我们正在使用 Kafka 和 Apache Storm(特别是 Trident,即微批处理)来做到这一点.我们的架构大致如下: (为我丑陋的照片道歉).我们使用 Mon
..
我在 SQL Server 中有一个表,我想将其流式传输到 Kafka 主题,结构如下: (用户 ID,报告 ID) 此表将不断更改(添加、插入、不更新记录) 我想把它转换成这种结构并放到 Elasticsearch 中: {“用户ID":1,“报告":[1, 2, 3, 4, 5, 6]} 到目前为止我看到的例子是日志或点击流,但在我的情况下不起作用. 这种用例可能吗?我总是
..
我有一个流函数KStream[] branch(final Predicate... predicates).我想动态创建一个谓词列表.这可能吗? KStream[] 分支 = 流.map((key, event) ->丰富(key, event)).branch(getStrategies());[...]私有列表>获取策略(){ArrayList>谓词 = new ArrayList();
..
当我尝试从流访问状态存储时,出现以下错误. 状态存储 count-store 可能已迁移到另一个实例. 当我尝试从商店访问 ReadOnlyKeyValueStore 时,在迁移到其他服务器时收到错误消息.但我只有一个经纪人正在运行 /****/包 com.ms.kafka.com.ms.stream;导入 java.util.Properties;导入 java.util.stre
..
StreamsBuilder builder = new StreamsBuilder();映射serdeConfig = Collections.singletonMap(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);Serde keySerde= getSerde(keyClass);keySerde.configure(serdeConfig,
..
我正在尝试测试作为最后一个节点具有 KTable 的拓扑.我的测试使用的是成熟的 Kafka 集群(通过 Confluent 的 Docker 镜像),所以我不使用 TopologyTestDriver. 我的拓扑有键值类型的输入 String ->Customer 和 String 的输出 ->客户映射.serdes、模式和与模式注册表的集成都按预期工作. 我正在使用 Scala、K
..
我有一个单节点 kafka 代理和简单的流应用程序.我创建了 2 个主题(topic1 和 topic2). 产生于 topic1 - 处理过的消息 - 写入 topic2 注意:对于产生的每条消息,只有一条消息写入目标主题 我生成了一条消息.写入 topic2 后,我停止了 kafka 代理.一段时间后,我重新启动了代理并在 topic1 上生成了另一条消息.现在流应用程序处理
..
我有多个冗余应用实例,它们想要使用一个主题的所有事件并独立存储它们以进行磁盘查找(通过 RocksDB). 为了论证起见,让我们假设这些多余的消费者正在为无状态的 http 请求提供服务;所以负载不是使用 kafka 共享的,而是使用 kafka 将数据从生产者复制到每个实例本地存储中. 查看生成的主题时,每个消费应用创建了 3 个额外的主题: {topicname}STATE-
..
我正在尝试使用低级处理器 API.我正在使用处理器 API 对传入记录进行数据聚合,并将聚合记录写入 RocksDB. 但是,我想保留在rocksdb中添加的记录仅在24小时内处于活动状态.24 小时后应删除该记录.这可以通过更改 ttl 设置来完成.但是,没有太多文档可以帮助我解决这个问题. 如何更改 ttl 值?我应该使用什么 java api 将 ttl 时间设置为 24 小时,
..
我有一个 Kafka Streams 应用程序,它从几个主题中获取数据并将数据连接起来并将其放入另一个主题中. Kafka 配置: 5 个 kafka 代理Kafka 主题 - 15 个分区和 3 个复制因子. 注意:我在运行 Kafka Broker 的同一台机器上运行 Kafka Streams 应用程序. 每小时消耗/产生数百万条记录.每当我关闭任何 kafka 经纪人时,它
..
似乎我的基于 KStream 的应用程序堆积了许多 GB 的文件(.sst、Log.old. 等). 这些会自行清理还是我需要密切关注?要设置一些参数来剔除它们? 解决方案 关于这些本地/临时文件:其中一些文件是应用程序状态,它们应该占消耗的大部分空间.您的应用程序可能会“堆积"许多 GB 的文件,这仅仅是因为您的应用程序实际上正在管理大量状态.如果删除这些文件,可以通过从 Kafk
..
假设您有一个带有空键的主题,其值为 {id:1, name:Chris, age:99} 假设您想按姓名计算人数.您可以执行以下操作: nameStream.groupBy((key,value) -> value.getName()).数数(); 现在让我们说它是有效的,你可以获得重复的记录,你可以根据 id 判断它是重复的. 例如: {id:1, name:Chris, age
..
我在使用 kafka 流时有时会发现 UNKNOWN_PRODUCER_ID 异常. 2018-06-25 10:31:38.329 WARN 1 --- [-1-1_0-producer] Oakclients.producer.internals.Sender : [Producer clientId=default-groupz-7bd94946-3bc0-4400-8e73-7126b9
..
使用 kafka-streams 0.10.0.0,我在转发消息时定期在 StreamTask 中看到空指针异常.它在 10% 到 50% 的调用之间变化.NPE 发生在这种方法中: public void forward(K 键,V 值){ProcessorNode thisNode = currNode;尝试 {for (ProcessorNode childNode : (List>) t
..
我知道默认 TTL 设置为无穷大(非正).但是,如果我们需要在存储中保留最多 2 天的数据,我们是否可以使用 RocksDBConfigSetter 接口实现进行覆盖,即 options.setWalTtlSeconds(172800)?或者它会与 Kafka 流内部发生冲突吗? 参考:https://docs.confluent.io/current/streams/developer-g
..