apache-kafka-streams相关内容

线程“main"中的异常org.apache.kafka.streams.errors.InvalidStateStoreException:

我正在尝试访问我在同一个 java 程序中创建的 inMemoryStore.但是返回一个异常作为“线程“main"中的异常 org.apache.kafka.streams.errors.InvalidStateStoreException:状态存储 storeName 可能已迁移到另一个实例." 当我使用persistentKeyValueStore 时它工作正常并且能够创建存储并返回值 ..
发布时间:2021-11-12 03:44:18 Java开发

无法反序列化实例 Kafka Streams

我做错了什么,我下面的 kafka 流程序在传输数据时出现问题,“无法从 START_ARRAY 令牌中反序列化 com.kafka.productiontest.models.TimeOff 的实例". 我有一个主题 timeOffs2,其中包含带键 timeOffID 的休假信息,值为包含 employeeId 的对象类型.我只想将员工密钥的所有休假时间分组并写入商店. 对于 st ..
发布时间:2021-11-12 03:44:15 Java开发

Kafka Streams:使用 at_least_once 时对状态存储的保存顺序有任何保证吗?

我们有一个使用处理器 API 构建的 Kafka Streams Java 拓扑. 在拓扑中,我们有一个处理器,可以保存到多个状态存储. 当我们使用 at_least_once 时,我们希望看到状态存储之间存在一些不一致 - 例如传入的记录会导致写入状态存储 A 和 B,但两次保存之间的崩溃只会导致存储 A 的保存写入 Kafka 更改日志主题. 我们是否保证我们保存的顺序也将是 ..
发布时间:2021-11-12 03:44:08 其他开发

如何转换/分叉 Kafka 流并将其发送到特定主题?

我正在尝试使用函数“mapValues"将在我的原始流“textlines"中获得的字符串值转换为 newStream.然后将我在 newStream 中获得的任何内容流式传输到名为“testoutput"的主题上.但是每次消息实际上通过转换块时,我都会收到一个 NullPointerException,错误仅指向 kafka 流库.不知道发生了什么:(( 附言当我从原始流分叉/创建新的 k ..

如果在处理步骤中发生故障,如何使 Spring Cloud 流 Kafka 流绑定器重试处理消息?

我正在使用 Spring Cloud Stream 处理 Kafka Streams.在消息处理应用程序中,有可能会产生错误.所以消息不应该被提交并再次重试. 我的申请方法- @Beanpublic Function, KStream>过程() {返回(输入)->{KStreamkt = input.flatMap ..

在 Kafka Streams 上写入 GlobalStateStore

我正在尝试在 Kafka DSL 上使用 addGlobalStore,其中需要存储一些我需要对所有线程/实例进行全局访问的值. 我的问题是我需要定期更新拓扑中的这些值并使所有正在运行的线程都知道新值. 我通过 builder.addGlobalStore 并使用处理器的 init() 函数初始化了全局存储,该函数用作此函数的最后一个参数,但我不能找到一种方法来更新全局存储中的值. ..
发布时间:2021-11-12 03:43:56 其他开发

无法编译 maven 项目

我使用 confluent 的人工制品开始了一个简单的项目.但是当我尝试使用 mvn clean install 或 mvn clean package 编译项目时,我收到类似 的消息 [错误] C:\Users\its\git\datakmu\service-stack\stream-data-preprocessor\src\main\java\at\ac\fhsalzburg\datak ..
发布时间:2021-11-12 03:43:53 Java开发

如何在 spring-cloud-stream 中的 kafka 进程拓扑中使用交互式查询?

是否可以在 Spring Cloud Stream 中使用带有 @EnableBinding 批注的类或在带有 @StreamListener 的方法中使用交互式查询 (InteractiveQueryService)?我尝试在提供的 KStreamMusicSampleApplication 类和处理方法,但始终为空. 我的@StreamListener 方法正在侦听一堆 KTable 和 ..

java.lang.IllegalStateException:这不应该发生,因为 headers() 应该只在处理记录时调用

启动 Stream 应用程序(使用 Kafka Streams)失败并显示“java.lang.IllegalStateException:这不应该发生,因为 headers() 应该只在处理记录时调用" 如果主题中已有数据,这似乎只会在我启动应用程序时发生.如果主题为空并且我开始向其推送数据,则一切正常. 有人知道为什么会这样吗? 谢谢 这不应该发生,因为 headers() ..
发布时间:2021-11-12 03:43:47 其他开发

Spring Cloud @StreamListener 条件已弃用什么是替代方案

我们有多个应用程序消费者监听同一个 kafka 主题,生产者在向主题发送消息时设置消息头,以便特定实例可以评估头并处理消息.例如 @StreamListener(target=ITestSink.CHANNEL_NAME,condition=“headers['franchiseName'] == 'sydney'")public void fullfillOrder(@Payload Test ..

Kafka Streams 删除消耗的重新分区记录,以减少磁盘使用

我们有一个 kafka 实例,大约有 5000 万条记录,每天大约有 10 万条输入,所以在 kafka 世界中没有什么疯狂的.当我们想用一个更复杂的流应用程序(具有许多不同的聚合步骤)重新处理这些记录时,磁盘使用率会因重新分区主题而变得非常疯狂.根据我们的理解,这些主题使用 kafka-streams 1.0.1 中的标准保留时间(14 天?)和 2.1.1 中的 Long.Max.这是非常不方 ..
发布时间:2021-11-12 03:43:38 其他开发

num.stream.threads 创建空闲线程

我有一个带有 2 个主题的 Spring Boot kafka 流应用程序,考虑主题 A 和 B.主题 A 有 16 个分区,主题 B 有 1 个分区.考虑将应用程序部署在具有 num.stream.threads=16.我运行 kafka-consumer-groups.bat 命令来检查线程如何分配给组中的分区,得到以下输出.主题 A 和 B 分配了 16 个线程,其中主题 B 中的 14 个 ..

在 kafka 流应用程序中关闭或不关闭 RocksDB Cache 和 WriteBufferManager

我目前正在通过扩展 RocksDBConfigSetter 接口在我的流应用程序中使用自定义 RocksDB 配置.我看到有关关闭 cache & 的冲突文档writeBufferManager 实例. 现在,我看到 javadoc &文档页面之一 建议我们需要关闭所有扩展 RocksObject 的实例(Cache 和 WriteBufferManager 实例都扩展了这个类)RocksD ..
发布时间:2021-11-12 03:43:30 Java开发

过滤 Kafka 流

我一直在检查 Kafka 流.我一直在为 Kafka 流测试以下代码 生产者主题:(这是第一个生产者主题 - 发送以下 json 数据) KafkaProducer生产者 = 新的 KafkaProducer(特性);producer.send(new ProducerRecord(topic, jsonobject.toString()));生产者.close(); JSON - ..
发布时间:2021-11-12 03:43:27 Java开发

如何在kafka中同步多个日志?

假设我有2种类型的日志,它们有一个共同的字段'uid',如果这2个包含uid的日志的日志都到了,我想输出日志,就像一个join,kafka可以吗? 解决方案 是的,绝对的.查看 Kafka Streams,特别是 DSL API.它是这样的: StreamsBuilder builder = new StreamsBuilder();KStreamfooStream = builder ..
发布时间:2021-11-12 03:43:24 其他开发

在运行时部署流处理拓扑?

大家好, 我有一个要求,我需要重新提取一些旧数据.我们有一个多阶段管道,其来源是一个 Kafka 主题.一旦将记录输入其中,它就会运行一系列步骤(大约 10 个).每一步都会对推送到源主题的原始 JSON 对象进行按摩,然后推送到目标主题. 现在,有时,我们需要重新摄取旧数据并应用我上面描述的步骤的一个子集.我们打算将这些重新摄取记录推送到不同的主题,以免阻止通过的“实时"数据,这可能 ..