apache-kafka-streams相关内容
我有一个简单的 KTable 定义来生成一个 Store: KTabletable = kStreamBuilder.table(ORDERS_TOPIC, ORDERS_STORE);table.print(); 我将消息发布到 ORDERS_TOPIC 中,但存储直到每 30 秒才真正更新一次.这是因为 30000 毫秒时间已经过去而出现提交消息的日志: 2017-07-25 23:53
..
我有一堆防火墙数据.我想: A) 将每个 IP 每小时的字节数相加,然后 B) 计算该小时内所有 IP 的最小和最大总和 我已经能够在 Kafka 中做 A,但是,我不知道如何做 B.我一直在研究文档,感觉自己快要接近了,但我似乎总是只找到了一部分解决方案. 我的 firewall_stream 运行良好. client.create_stream(table_name=
..
这是对我之前发送的有关 Kafka Streams 中的高延迟问题的跟进;(Kafka Streams 重新平衡高吞吐量 kafka 上的延迟峰值- 流服务). 提醒一下,我们的无状态服务有非常严格的延迟要求,而且我们面临着延迟过高的问题(有些消息在生成后消耗超过 10 秒),特别是当消费者优雅地离开组时. 经过进一步调查,我们发现至少对于小型消费群体而言,重新平衡的时间不到 500
..
我正在尝试从 KGroupedStream 创建一个 KTable 来存储每个键的值的总和. final StreamsBuilder builder = new StreamsBuilder();最终 KTablesum = builder.stream("streams-plaintext-input").groupByKey().aggregate(new Initializer() {
..
如何确定主题的 KTable 实现何时完成? 例如假设 KTable 有几百万行.伪代码如下: KTablekt = kgroupedStream.groupByKey(..).reduce(..);//假设这会产生几百万行 在某个时间点,我想安排一个线程来调用以下内容,写入主题:kt.toStream().to("output_topic_name"); 我想确保所有数据都作为上
..
我正在使用 Spark 2.2.0 和 kafka 0.10 spark-streaming 库来读取充满 Kafka-Streams scala 应用程序的主题.Kafka Broker 版本为 0.11,Kafka-streams 版本为 0.11.0.2. 当我在 Kafka-Stream 应用中设置 EXACTLY_ONCE 保证时: p.put(StreamsConfig.PR
..
假设我们有一个转换器(用 Scala 编写) new Transformer[String, V, (String, V)]() {var 上下文:ProcessorContext = _覆盖 def init(context: ProcessorContext): Unit = {this.context = 上下文}覆盖 def 变换(键:字符串,值:V):(字符串,V)= {val 时间戳
..
我使用 StreamsBuilder 的简单 API 来构建 GlobalKTable,如下所示: Materialized>物化 =物化.>as(this.categoryStoreName).withCachingDisabled().withKeySerde(Serdes.Long()).withValueSerde(CATEGORY_JSON_SERDE);返回streamsBuilde
..
我正在用 Java 编写一个 Kafka 流应用程序,它接受由连接器创建的输入主题,该连接器使用模式注册表和 avro 作为键和值转换器.连接器产生以下架构: key-schema: "int"价值模式:{"类型": "记录","name": "用户",“领域":[{"name": "firstname", "type": "string"},{"name": "lastname", "type
..
我想在同一个实例上运行 2 个拓扑.1个拓扑涉及状态存储,其他涉及全局存储.我如何成功地做到这一点? 我创建了 1 个包含 3 个分区的主题,然后在 1 个拓扑中添加了一个状态存储,在第二个拓扑中添加了一个全局存储. 拓扑 1: public void createTopology() {拓扑拓扑 = new Topology();topology.addSource("sourc
..
我想给我的消费者一些时间来重新启动,以免发生不必要的重新平衡.我怎样才能做到这一点?在关闭的情况下,我希望复制出现,一段时间后如果消费者没有备份,则应该发生重新平衡,否则不会发生. 解决方案 您可以调整名为 group.initial.rebalance.delay.ms 的代理级别配置. 群协调者等待更多消费者的时间在执行第一次重新平衡之前加入一个新组.更长的延迟意味着可能更少的重
..
我正在尝试使用对象的部分数据更新 KTable.例如.用户对象是{"id":1, "name":"Joe", "age":28}对象被流式传输到一个主题中,并按密钥分组到 KTable 中.现在用户对象部分更新如下 {"id":1, "age":33} 并流式传输到表中.但更新后的表看起来如下 {"id":1, "name":null, "age":28}.预期输出为 {"id":1, "name
..
我在尝试通过 Kafka Streams 实现以下目标时遇到了一些麻烦: 在应用程序启动时,(压缩的)主题 alpha 被加载到键值 StateStore 映射中 Kafka Stream 从另一个主题中消费,使用 (.get) 上面的映射,最后在主题中生成一条新记录 alpha 结果是内存映射应该与基础主题对齐,即使流媒体重新启动也是如此. 我的方法如下: val builde
..
我们使用 kafka 流 api 进行聚合,其中我们也使用 group by.我们还使用状态存储来保存输入主题数据. 我注意到的是 Kafka内部创建了3种topic Changelog-- 重新分区-- - 我无法理解的是 为什么当我拥有 - 中的所有数据时,它会创建变更日志主题重新分区主题是否包含分组后的数据. 而且我看到 Changelog 和 topi
..
使用 spring-boot-2.1.3、spring-kafka-2.2.4,我想有两个流配置(例如有不同的 application.id,或连接到不同的集群等).所以我几乎根据文档定义了第一个流配置,然后添加了第二个,具有不同的名称,以及第二个 StreamsBuilderFactoryBean(也具有不同的名称): @Bean(name = KafkaStreamsDefaultConfi
..
我喜欢 Kafka,但讨厌必须编写大量序列化器/反序列化器,所以我尝试创建一个可以反序列化泛型 T 的 GenericDeserializer. 这是我的尝试: class GenericDeserializer configs, boolean isKey) {}@覆盖公共 T 反序列化(字符串主题,字节 [] 数据){T 结果 = 空;尝试 {结果 = ( T )( objectMap
..
至少一次保证,我知道在失败的情况下有可能重复.然而, 1) Kafka Stream 库执行提交的频率如何? 2) 除了上述之外,用户是否还需要考虑提交? 3)是否有关于提交频率的最佳实践? 解决方案 Kafka Streams 定期提交,可以通过参数 commit.interval.ms 进行配置(默认为 30 秒;如果恰好-启用处理后,默认为 100 毫秒). 通常,用户不
..
我们开始使用 Kafka 流,我们的服务是一个非常简单的无状态消费者. 我们对延迟有严格的要求,当消费者群体重新平衡时,我们面临着过高的延迟问题.在我们的场景中,重新平衡会相对频繁地发生:滚动更新代码、扩展/缩减服务、容器被集群调度程序改组、容器死亡、硬件故障. 我们所做的第一个测试是让一个包含 4 个消费者的小型消费者组处理少量消息(1K/秒)并杀死其中一个;集群管理器(目前是 AW
..
我有一个简单的流应用程序,将一个主题作为输入流并将 KeyValues 转换为另一个主题,例如: StoreBuilder>建设者 =Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(CategoryTransformer.STORE_NAME),Serdes.Long(), CATEGORY_JSON_SERDE);streamB
..
在我的公司,我们广泛使用 Kafka,但出于容错的原因,我们一直使用关系数据库来存储多个中间转换和聚合的结果.现在我们正在探索 Kafka Streams 作为一种更自然的方式来做到这一点.通常,我们的需求非常简单——其中一种情况是 监听、、、... 的输入队列对于每条记录,执行一些高延迟操作(调用远程服务) 如果在处理 时,并且 、 都已生成,则我应该处理 V3,因为 V2 已经过时了
..