apache-kafka-streams相关内容
因为我使用的是 Json 值,所以我没有设置默认的 serdes. 我处理了一个 KStream,使用必要的 spring 和产品 (json) serdes 来使用它,但是下一步(映射操作)失败了: val props = Properties()props[StreamsConfig.APPLICATION_ID_CONFIG] = applicationName道具[StreamsC
..
我试图弄清楚 GlobalKTable 是如何工作的,并注意到我的内存键值存储在重新启动时没有填充.然而,文档听起来它是在重新启动的情况下填充的,因为整个数据在客户端上是重复的. 当我调试我的应用程序时,看到 /tmp/kafka-streams/category-client-1/global/.checkpoint 上有一个文件,它包含一个关于我的主题的偏移量.这对于保留其数据并改进重新
..
我想拥有 Exactly-once 语义,但我不想使用 Consumer 读取消息.我宁愿使用 Kafka Streams AP 阅读消息.如果我将 processing.guarantee=exactly_once 添加到 Stream 配置中,是否会保留恰好一次语义? 解决方案 Exactly-once 处理基于读-处理-写模式.Kafka Streams 使用这种模式,因此,如果您编
..
我正在研究 Kafka Streams,但我对 Java 8 中 WordCount 的第一个示例有疑问,该示例取自文档. 使用最新版本的 kafka 流、Kafka Connect 和 WordCount lambda 表达式示例. 我遵循以下步骤:我在 Kafka 中创建了一个输入主题和一个输出主题.启动应用程序流,然后通过从 .txt 文件中插入一些单词来上传输入主题 在第
..
我读了很多关于全局状态存储的信息,它不会为恢复创建更改主题主题,而是使用源主题作为恢复. 我正在创建自定义密钥并将数据存储在全局状态存储中,但在重新启动后它将消失,因为还原时的全局存储将直接从源主题获取数据并绕过处理器. 我输入的主题有以上数据. {"id": "user-12345",“用户客户端":["clientid-1",“clientid-2"]} 我维护两个状态存储如下
..
我想做一个 KStream 到 KTable Join.使用 KTable 作为查找表.下面的步骤显示了代码执行的顺序 构建 KTable ReKey KTable 构建 KStream ReKey KStream 加入KStream - KTable 假设 KStream 中有 8000 条记录,KTable 中有 14 条记录,假设 KStreams 中的每
..
我正在尝试通过键将 KStream 与 GlobalKTable 连接起来,但具有特定的逻辑. StreamsBuilder builder = new StreamsBuilder();KStream流 = builder.stream(inputTopic1);//键 = "ABC"GlobalKTabletable = builder.globalTable(inputTopic2);/
..
我刚刚开始讨论实时流数据处理框架的话题,我有一个问题,目前我还没有找到任何决定性的答案: 通常的怀疑对象(Apache 的 Spark、Kafka、Storm、Flink 等)是否支持以 纳秒(甚至皮秒)的事件时间分辨率处理数据? 大多数人和文档都谈论毫秒或微秒分辨率,但我无法找到明确的答案,如果可能有更多分辨率或问题.我推断唯一具有此功能的框架是 influxData 的 Kapac
..
我有一个 Kafka Streams 应用程序(Kafka Streams 2.1 + Kafka broker 2.0),它基于 TimeWindows 进行聚合,我使用抑制运算符来抑制结果的输出. 一切正常,直到我重新启动我的应用程序,它会将 KTABLE-SUPPRESS-STATE-STORE 的偏移量重置为 0 以恢复抑制状态,正如预期的那样.但是每次重启都会抛出一个OutOfMe
..
所以我根据一些建议进行了更新.但流应用程序在一段时间后终止.没有表演.ide 显示的以下代码中没有错误.最后,我将数据发送到主题,因为键等于字符串,值作为 json 对象.还是不行. 我猜它是一条线或其他东西,但不确定我是否正确.请.还附上了下面的错误截图. SerializerjsonSerializer = 新的 JsonSerializer();解串器jsonDeseriali
..
我使用 Kafka Streams Binder 的 Spring Cloud Stream 应用程序具有以下属性: spring.cloud.stream.bindings:windowStream-in-0:目的地:输入windowStream-out-0:目的地:window1提示1Stream-in-0:目的地:window1提示1Stream-out-0:目的地:提示0 中的现实流:目
..
我是 Kafka 的新手,我正在构建一个使用 Twitter API 作为数据源的入门项目.我创建了一个生产者,它可以查询 Twitter API,并使用字符串序列化器将数据发送到我的 kafka 主题,用于键和值.我的 Kafka Stream 应用程序读取这些数据并进行字数统计,但也按推文的日期进行分组.这部分是通过名为 wordCounts 的 KTable 完成的,以利用其 upsert
..
Kafka 流使用哪种分区策略?我们可以在Kafka Stream中更改分区策略,就像我们可以在普通Kafka Consumer中更改一样 streamsConfiguration.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,Collections.singletonList(StickyAssignor.class)); 没
..
所使用的build.sbt文件如下: scalaVersion := "2.12.1"val kafka_streams_version = "2.0.0"libraryDependencies ++= Seq("org.apache.kafka" % "kafka-streams" % kafka_streams_version % 提供,"org.apache.kafka" % "kafka
..
我的流处理器中有一个包含多个分区的主题,我只想从一个分区流式传输该主题,并且无法弄清楚如何配置此 spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=s-processorspring.cloud.stream.bindings.input.destination=uinputspring.cloud.
..
我有一个主题名称,它是 push-processing-KSTREAM-PEEK-0000000014-repartition,这是 kafka 的内部主题.我没有创建这个主题,我在重新分区后使用 .peek() 方法并使用 peek 方法 3-4 次. 我的问题是我可以从主题topic read push-processing-KSTREAM-PEEK-0000000014-reparti
..
我的日志中有一个间歇性问题. 似乎心线一直在挣扎和获取 发送获取请求时出错org.apache.kafka.common.errors.DisconnectException由于以下原因,组协调器 xxx 不可用或无效:协调器不可用.isDisconnected:true.将尝试重新发现. 我已经把 heartbeat.interval.ms 变大了一点,但这种情况仍在发生. 我想
..
我在加入 2 个 kafka 流以从我的事件字段中提取日期时遇到问题.当我没有定义自定义 TimeStampExtractor 时,联接工作正常,但当我这样做时,联接不再起作用.我的拓扑很简单: val builder = new StreamsBuilder()valcouponConsumedWith = Consumed.`with`(Serdes.String(),getAvroCoup
..
我正在尝试对传入的 kafka 消息进行重复数据删除(我正在轮询一个数据源,该数据源可以在第二天提供给定日期的所有数据点,但时间不一致,所以我每 x 分钟轮询一次,我想要对数据点进行重复数据删除,以获得仅包含新点的干净下游主题). 为此,我构建了一个自定义转换器,它依赖于商店来跟踪哪个“点"是已经被处理了.由于数据点的日期时间是重复数据删除键的一部分,我有一组无限的键,所以我不能依赖简单的
..
目前我正在使用 SCS 和几乎默认配置的微服务之间发送和接收消息.不知怎的,我读过这个 https://www.confluent.io/blog/enabling-exactly-kafka-流 并想知道如果我们只是通过 Spring Boot 应用程序中的属性将名为“processing.guarantee"且值为“exactly-once"的属性放在那里,它会起作用吗? 解决方
..