apache-kafka-streams相关内容
我正在尝试在 Kafka Streams 中创建一个 leftJoin,它可以正常工作大约 10 条记录,然后它崩溃了,由 NullPointerException 引起的异常使用这样的代码: 私有静态 KafkaStreams getKafkaStreams() {StreamsConfig StreamsConfig = new StreamsConfig(getProperties());
..
我有一个名为 A 的 Kafka 主题. 主题A中的数据格式为: { id : 1, name:stackoverflow, created_at:2017-09-28 22:30:00.000}{ id: 2, name:confluent, created_at:2017-09-28 22:00:00.000}{ id: 3, name:kafka, created_at:2017-0
..
我有两个 Java 应用程序(App1、App2)来测试如何在 docker 的单个实例环境中从不同的应用程序访问 KTable. 第一个应用程序 (App1) 使用以下代码写入 KTable. public static void main(String[] args){最终属性道具 = 新属性();props.put(StreamsConfig.APPLICATION_ID_CONFI
..
我有以下拓扑: 创建一个状态存储 根据 SOME_CONDITION 过滤记录,将其值映射到新实体,最后将这些记录发布到另一个主题 STATIONS_LOW_CAPACITY_TOPIC 但是我在 STATIONS_LOW_CAPACITY_TOPIC 上看到了这个: null空空{"id":140,"latitude":"40.4592351","longitude":"-3
..
我正在每 5 分钟推进一次的 4 小时窗口上进行跳跃窗口聚合.由于跳跃窗口重叠,我得到了具有不同聚合值的重复键. TimeWindows.of(240 * 60 * 1000L).advanceBy(5 * 60* 1000L) 如何消除具有重复数据的重复键或仅选择包含最新值的键. 解决方案 2021 年 5 月更新: Kafka Streams API 支持 “final"现在窗口结
..
我有一个 kafka 流应用程序,它具有 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class); 或 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class); 在 kafka 2.4 版本中即使使用
..
我使用的是 Kafka 流 2.2.1. 我使用抑制来阻止事件直到窗口关闭.我正在使用事件时间语义.但是,只有在流上有新消息可用时才会触发触发消息. 提取以下代码对问题进行采样: KStream[] 分支 = 是.branch((key, msg) -> "a".equalsIgnoreCase(msg.split(",")[1]),(键,味精)->"b".equalsIgnore
..
类似于:更改 spring-cloud-stream 实例索引/运行时计数 我在微服务器架构中对批处理的启动做了一个 poc,我正在将 Spring 批处理与 Spring Cloud Stream Kafka 一起使用.我正在寻找一种方法来动态创建消费者(处理器)应用程序的多个实例.我看到可以用 定义多个实例 spring.cloud.stream.instanceCount=nspr
..
我对以下拓扑的行为有一些疑问: String topic = config.topic();KTablemyTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);//接收各种事件流拓扑.eventsStream()//只处理实现 MyEvent 的事件.filter((k,
..
据我阅读 Kafka Streams 文档后的理解,不可能将它用于仅从给定主题的一个分区流式传输数据,人们总是必须完整阅读. 正确吗? 如果是,未来是否有计划为 API 提供这样的选项? 解决方案 不,你不能这样做,因为内部消费者订阅主题,加入通过 application-id 指定的消费者组,所以分区是自动分配的.顺便说一句,你为什么要这样做?如果不重新平衡,您将失去 Kaf
..
我的 Kafka 主题包含由 deviceId 键控的状态.我想使用 KStreamBuilder.stream().groupByKey().aggregate(...) 只在 TimeWindow 中保留状态的最新值.我猜想,只要主题按键分区,聚合函数总是可以以这种方式返回最新值: (key, value, old_value) ->价值 这是我可以从 Kafka Streams
..
所以我正在尝试使用 Kafka 流进行交互式查询.我有 Zookeeper 和 Kafka 在本地运行(在 Windows 上).我使用 C:\temp 作为存储文件夹,用于 Zookeeper 和 Kafka. 我已经设置了这样的主题 kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 -
..
我正在使用以下 log4j.properties log4j.rootLogger=DEBUG,标准输出log4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.Target=System.outlog4j.appender.stdout.layout=org.apache.log4j.PatternL
..
我将 Kafka Stream 库用于流式应用程序.我想设置 kafka 消费者组 ID.然后,我把 Kafka 流配置如下. streamCopnfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "JoinTestApp");streamCopnfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "
..
背景 多台机器生成事件.这些事件被发送到我们的 Kafka 集群,其中每台机器都有自己的主题 (app.machine-events.machine-name).因为在每台机器的基础上顺序很重要,而且分区大小现在不是问题,所以所有主题都由一个分区组成.因此,N 个主题目前也意味着 N 个分区. 消费/处理应用程序使用 kafka-streams,我们给了 StreamsConfig.A
..
我想做的是: 使用主题中的记录 计算每 1 秒窗口的值 检测记录数 的窗口4 将最终结果发送到另一个主题 我使用抑制来发送最终结果,但出现了这样的错误. 09:18:07,963 错误 org.apache.kafka.streams.processor.internals.ProcessorStateManager- 任务 [1_0] 未能刷新状态存储 KSTREAM-AGG
..
我在 Spring Boot 中使用 Kafka Streams.在我的用例中,当我从其他微服务接收客户事件时,我需要存储在 customer 物化视图中,当我收到 order 事件时,我需要加入客户并订购然后存储在客户订单物化视图中.为了实现这一点,我创建了持久键值存储 customer-store 和 当有新事件发生时更新它. StoreBuilder customerStateStore
..
这是一个后续问题:Kafka Streams- 如何扩展 Kafka 存储生成的变更日志主题 让我们假设流消费者需要在存储数据之前进行一些转换(通过 v->k 而不是 k->v 索引). 最后,目标是每个消费者都需要将完整的转换记录集 (v->k) 存储在 RocksDB 中.我知道上游的另一个处理器可以根据 k->v 来处理生成 v->k 并且最终消费者可以简单地从全局表中具体化新主
..
我正在尝试根据 KafkaStreams 的惯例和合理性做出实用的设计决策. 假设我有两个不同的事件要放入 KTable 中.我有一个生产者将这些消息发送到正在侦听该主题的 KStream. 据我所知,我不能对使用 KafkaStreams 的消息使用条件转发,所以如果流订阅了许多主题(例如,上述每个消息一个),我只能调用stream.to 在单个接收器主题上 - 否则,我将不得不在流
..
假设我们在 2 台不同的机器(实例)上启动了 2 个流任务,具有以下属性:- public final static String applicationID =“StreamsPOC";public final static String bootstrapServers = "10.21.22.56:9093";public final static String topicname =“T
..