apache-kafka-streams相关内容
我正在尝试访问我在同一个 java 程序中创建的 inMemoryStore.但是返回一个异常作为“线程“main"中的异常 org.apache.kafka.streams.errors.InvalidStateStoreException:状态存储 storeName 可能已迁移到另一个实例." 当我使用persistentKeyValueStore 时它工作正常并且能够创建存储并返回值
..
我做错了什么,我下面的 kafka 流程序在传输数据时出现问题,“无法从 START_ARRAY 令牌中反序列化 com.kafka.productiontest.models.TimeOff 的实例". 我有一个主题 timeOffs2,其中包含带键 timeOffID 的休假信息,值为包含 employeeId 的对象类型.我只想将员工密钥的所有休假时间分组并写入商店. 对于 st
..
是否可以记录来自流函数的入站消息?是否有某种拦截器可以让我这样做? 解决方案 使用 KStream.peek(StreamLogger::in) 方法记录传入的字符串 :)
..
我们有一个使用处理器 API 构建的 Kafka Streams Java 拓扑. 在拓扑中,我们有一个处理器,可以保存到多个状态存储. 当我们使用 at_least_once 时,我们希望看到状态存储之间存在一些不一致 - 例如传入的记录会导致写入状态存储 A 和 B,但两次保存之间的崩溃只会导致存储 A 的保存写入 Kafka 更改日志主题. 我们是否保证我们保存的顺序也将是
..
我正在尝试使用函数“mapValues"将在我的原始流“textlines"中获得的字符串值转换为 newStream.然后将我在 newStream 中获得的任何内容流式传输到名为“testoutput"的主题上.但是每次消息实际上通过转换块时,我都会收到一个 NullPointerException,错误仅指向 kafka 流库.不知道发生了什么:(( 附言当我从原始流分叉/创建新的 k
..
我正在使用 Spring Cloud Stream 处理 Kafka Streams.在消息处理应用程序中,有可能会产生错误.所以消息不应该被提交并再次重试. 我的申请方法- @Beanpublic Function, KStream>过程() {返回(输入)->{KStreamkt = input.flatMap
..
我正在使用在 map/foreach 期间调用外部系统的 Kafka Streams.map 或 foreach 可以花费多长时间? 长时间(比如几个小时)阻塞有什么注意事项吗? 解决方案 map/foreach 没有超时. 然而,由于 Kafka Streams 在内部使用 KafkaConsumer 和 KafkaProducer,它们的所有超时都适用(例如 max.pol
..
我正在尝试在 Kafka DSL 上使用 addGlobalStore,其中需要存储一些我需要对所有线程/实例进行全局访问的值. 我的问题是我需要定期更新拓扑中的这些值并使所有正在运行的线程都知道新值. 我通过 builder.addGlobalStore 并使用处理器的 init() 函数初始化了全局存储,该函数用作此函数的最后一个参数,但我不能找到一种方法来更新全局存储中的值.
..
我使用 confluent 的人工制品开始了一个简单的项目.但是当我尝试使用 mvn clean install 或 mvn clean package 编译项目时,我收到类似 的消息 [错误] C:\Users\its\git\datakmu\service-stack\stream-data-preprocessor\src\main\java\at\ac\fhsalzburg\datak
..
是否可以在 Spring Cloud Stream 中使用带有 @EnableBinding 批注的类或在带有 @StreamListener 的方法中使用交互式查询 (InteractiveQueryService)?我尝试在提供的 KStreamMusicSampleApplication 类和处理方法,但始终为空. 我的@StreamListener 方法正在侦听一堆 KTable 和
..
启动 Stream 应用程序(使用 Kafka Streams)失败并显示“java.lang.IllegalStateException:这不应该发生,因为 headers() 应该只在处理记录时调用" 如果主题中已有数据,这似乎只会在我启动应用程序时发生.如果主题为空并且我开始向其推送数据,则一切正常. 有人知道为什么会这样吗? 谢谢 这不应该发生,因为 headers()
..
我们有多个应用程序消费者监听同一个 kafka 主题,生产者在向主题发送消息时设置消息头,以便特定实例可以评估头并处理消息.例如 @StreamListener(target=ITestSink.CHANNEL_NAME,condition=“headers['franchiseName'] == 'sydney'")public void fullfillOrder(@Payload Test
..
我们有一个 kafka 实例,大约有 5000 万条记录,每天大约有 10 万条输入,所以在 kafka 世界中没有什么疯狂的.当我们想用一个更复杂的流应用程序(具有许多不同的聚合步骤)重新处理这些记录时,磁盘使用率会因重新分区主题而变得非常疯狂.根据我们的理解,这些主题使用 kafka-streams 1.0.1 中的标准保留时间(14 天?)和 2.1.1 中的 Long.Max.这是非常不方
..
我有一个带有 2 个主题的 Spring Boot kafka 流应用程序,考虑主题 A 和 B.主题 A 有 16 个分区,主题 B 有 1 个分区.考虑将应用程序部署在具有 num.stream.threads=16.我运行 kafka-consumer-groups.bat 命令来检查线程如何分配给组中的分区,得到以下输出.主题 A 和 B 分配了 16 个线程,其中主题 B 中的 14 个
..
我希望能够根据消息键的键将 Kafkastream 中的所有记录发送到不同的主题.前任.Kafka 中的流包含名称作为键和记录作为值.我想根据记录的键将这些记录扇出到不同的主题 data : (jhon -> {jhonsRecord}),(sean -> {seansRecord}),(mary -> {marysRecord}),(jhon -> {jhonsRecord2}),预期
..
我目前正在通过扩展 RocksDBConfigSetter 接口在我的流应用程序中使用自定义 RocksDB 配置.我看到有关关闭 cache & 的冲突文档writeBufferManager 实例. 现在,我看到 javadoc &文档页面之一 建议我们需要关闭所有扩展 RocksObject 的实例(Cache 和 WriteBufferManager 实例都扩展了这个类)RocksD
..
我一直在检查 Kafka 流.我一直在为 Kafka 流测试以下代码 生产者主题:(这是第一个生产者主题 - 发送以下 json 数据) KafkaProducer生产者 = 新的 KafkaProducer(特性);producer.send(new ProducerRecord(topic, jsonobject.toString()));生产者.close(); JSON -
..
假设我有2种类型的日志,它们有一个共同的字段'uid',如果这2个包含uid的日志的日志都到了,我想输出日志,就像一个join,kafka可以吗? 解决方案 是的,绝对的.查看 Kafka Streams,特别是 DSL API.它是这样的: StreamsBuilder builder = new StreamsBuilder();KStreamfooStream = builder
..
大家好, 我有一个要求,我需要重新提取一些旧数据.我们有一个多阶段管道,其来源是一个 Kafka 主题.一旦将记录输入其中,它就会运行一系列步骤(大约 10 个).每一步都会对推送到源主题的原始 JSON 对象进行按摩,然后推送到目标主题. 现在,有时,我们需要重新摄取旧数据并应用我上面描述的步骤的一个子集.我们打算将这些重新摄取记录推送到不同的主题,以免阻止通过的“实时"数据,这可能
..
我有 2 个单元测试 当我运行它们时出现以下错误 1) 测试 @Testpublic void simpleInsertAndOutputEventPrint() 抛出 IOException,URISyntaxException {GenericRecord 记录 = getInitialEvent();testDriver.pipeInput(recordFactory.cre
..