apache-kafka-streams相关内容

未指定默认 serdes 并使用自定义 serdes 时,对 KStream 的映射操作失败 ->org.apache.kafka.streams.errors.StreamsException

因为我使用的是 Json 值,所以我没有设置默认的 serdes. 我处理了一个 KStream,使用必要的 spring 和产品 (json) serdes 来使用它,但是下一步(映射操作)失败了: val props = Properties()props[StreamsConfig.APPLICATION_ID_CONFIG] = applicationName道具[StreamsC ..
发布时间:2021-11-12 03:45:24 其他开发

为什么在重新启动时不会填写具有内存中键值存储的 GlobalKTable?

我试图弄清楚 GlobalKTable 是如何工作的,并注意到我的内存键值存储在重新启动时没有填充.然而,文档听起来它是在重新启动的情况下填充的,因为整个数据在客户端上是重复的. 当我调试我的应用程序时,看到 /tmp/kafka-streams/category-client-1/global/.checkpoint 上有一个文件,它包含一个关于我的主题的偏移量.这对于保留其数据并改进重新 ..
发布时间:2021-11-12 03:45:21 其他开发

如果我在 Kafka 中有事务性生产者,我可以使用 Kafka Streams 读取一次消息吗?

我想拥有 Exactly-once 语义,但我不想使用 Consumer 读取消息.我宁愿使用 Kafka Streams AP 阅读消息.如果我将 processing.guarantee=exactly_once 添加到 Stream 配置中,是否会保留恰好一次语义? 解决方案 Exactly-once 处理基于读-处理-写模式.Kafka Streams 使用这种模式,因此,如果您编 ..

Kafka 流 - 第一个示例 WordCount 未正确计算第一圈

我正在研究 Kafka Streams,但我对 Java 8 中 WordCount 的第一个示例有疑问,该示例取自文档. 使用最新版本的 kafka 流、Kafka Connect 和 WordCount lambda 表达式示例. 我遵循以下步骤:我在 Kafka 中创建了一个输入主题和一个输出主题.启动应用程序流,然后通过从 .txt 文件中插入一些单词来上传输入主题 在第 ..

全局状态存储不创建更改日志主题如果全局存储的输入主题具有空键,有什么解决方法?

我读了很多关于全局状态存储的信息,它不会为恢复创建更改主题主题,而是使用源主题作为恢复. 我正在创建自定义密钥并将数据存储在全局状态存储中,但在重新启动后它将消失,因为还原时的全局存储将直接从源主题获取数据并绕过处理器. 我输入的主题有以上数据. {"id": "user-12345",“用户客户端":["clientid-1",“clientid-2"]} 我维护两个状态存储如下 ..
发布时间:2021-11-12 03:45:12 Java开发

流数据处理和纳秒时间分辨率

我刚刚开始讨论实时流数据处理框架的话题,我有一个问题,目前我还没有找到任何决定性的答案: 通常的怀疑对象(Apache 的 Spark、Kafka、Storm、Flink 等)是否支持以 纳秒(甚至皮秒)的事件时间分辨率处理数据? 大多数人和文档都谈论毫秒或微秒分辨率,但我无法找到明确的答案,如果可能有更多分辨率或问题.我推断唯一具有此功能的框架是 influxData 的 Kapac ..

重新启动我的 Kafka Streams 应用程序时出现 OutOfMemoryError

我有一个 Kafka Streams 应用程序(Kafka Streams 2.1 + Kafka broker 2.0),它基于 TimeWindows 进行聚合,我使用抑制运算符来抑制结果的输出. 一切正常,直到我重新启动我的应用程序,它会将 KTABLE-SUPPRESS-STATE-STORE 的偏移量重置为 0 以恢复抑制状态,正如预期的那样.但是每次重启都会抛出一个OutOfMe ..
发布时间:2021-11-12 03:44:55 Java开发

我有一个用于 kafka 连接的 kafka 管道(json 问题更新)

所以我根据一些建议进行了更新.但流应用程序在一段时间后终止.没有表演.ide 显示的以下代码中没有错误.最后,我将数据发送到主题,因为键等于字符串,值作为 json 对象.还是不行. 我猜它是一条线或其他东西,但不确定我是否正确.请.还附上了下面的错误截图. SerializerjsonSerializer = 新的 JsonSerializer();解串器jsonDeseriali ..
发布时间:2021-11-12 03:44:52 Java开发

如何使 Serdes 与多步 kafka 流一起工作

我是 Kafka 的新手,我正在构建一个使用 Twitter API 作为数据源的入门项目.我创建了一个生产者,它可以查询 Twitter API,并使用字符串序列化器将数据发送到我的 kafka 主题,用于键和值.我的 Kafka Stream 应用程序读取这些数据并进行字数统计,但也按推文的日期进行分组.这部分是通过名为 wordCounts 的 KTable 完成的,以利用其 upsert ..
发布时间:2021-11-12 03:44:47 其他开发

Kafka Stream 中的分区策略

Kafka 流使用哪种分区策略?我们可以在Kafka Stream中更改分区策略,就像我们可以在普通Kafka Consumer中更改一样 streamsConfiguration.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,Collections.singletonList(StickyAssignor.class)); 没 ..
发布时间:2021-11-12 03:44:44 其他开发

从 kafka 流中阅读 peek 主题

我有一个主题名称,它是 push-processing-KSTREAM-PEEK-0000000014-repartition,这是 kafka 的内部主题.我没有创建这个主题,我在重新分区后使用 .peek() 方法并使用 peek 方法 3-4 次. 我的问题是我可以从主题topic read push-processing-KSTREAM-PEEK-0000000014-reparti ..
发布时间:2021-11-12 03:44:32 其他开发

kafka-streams 间歇性 isDisconnected

我的日志中有一个间歇性问题. 似乎心线一直在挣扎和获取 发送获取请求时出错org.apache.kafka.common.errors.DisconnectException由于以下原因,组协调器 xxx 不可用或无效:协调器不可用.isDisconnected:true.将尝试重新发现. 我已经把 heartbeat.interval.ms 变大了一点,但这种情况仍在发生. 我想 ..
发布时间:2021-11-12 03:44:29 其他开发

如何测试 WindowStore 保留期?

我正在尝试对传入的 kafka 消息进行重复数据删除(我正在轮询一个数据源,该数据源可以在第二天提供给定日期的所有数据点,但时间不一致,所以我每 x 分钟轮询一次,我想要对数据点进行重复数据删除,以获得仅包含新点的干净下游主题). 为此,我构建了一个自定义转换器,它依赖于商店来跟踪哪个“点"是已经被处理了.由于数据点的日期时间是重复数据删除键的一部分,我有一组无限的键,所以我不能依赖简单的 ..
发布时间:2021-11-12 03:44:23 其他开发

是否可以使用 Spring Cloud Stream 进行一次处理?

目前我正在使用 SCS 和几乎默认配置的微服务之间发送和接收消息.不知怎的,我读过这个 https://www.confluent.io/blog/enabling-exactly-kafka-流 并想知道如果我们只是通过 Spring Boot 应用程序中的属性将名为“processing.guarantee"且值为“exactly-once"的属性放在那里,它会起作用吗? 解决方 ..