spring-cloud-stream相关内容

如果在处理步骤中发生故障,如何使 Spring Cloud 流 Kafka 流绑定器重试处理消息?

我正在使用 Spring Cloud Stream 处理 Kafka Streams.在消息处理应用程序中,有可能会产生错误.所以消息不应该被提交并再次重试. 我的申请方法- @Beanpublic Function, KStream>过程() {返回(输入)->{KStreamkt = input.flatMap ..

如何在 spring-cloud-stream 中的 kafka 进程拓扑中使用交互式查询?

是否可以在 Spring Cloud Stream 中使用带有 @EnableBinding 批注的类或在带有 @StreamListener 的方法中使用交互式查询 (InteractiveQueryService)?我尝试在提供的 KStreamMusicSampleApplication 类和处理方法,但始终为空. 我的@StreamListener 方法正在侦听一堆 KTable 和 ..

Spring Cloud @StreamListener 条件已弃用什么是替代方案

我们有多个应用程序消费者监听同一个 kafka 主题,生产者在向主题发送消息时设置消息头,以便特定实例可以评估头并处理消息.例如 @StreamListener(target=ITestSink.CHANNEL_NAME,condition=“headers['franchiseName'] == 'sydney'")public void fullfillOrder(@Payload Test ..

num.stream.threads 创建空闲线程

我有一个带有 2 个主题的 Spring Boot kafka 流应用程序,考虑主题 A 和 B.主题 A 有 16 个分区,主题 B 有 1 个分区.考虑将应用程序部署在具有 num.stream.threads=16.我运行 kafka-consumer-groups.bat 命令来检查线程如何分配给组中的分区,得到以下输出.主题 A 和 B 分配了 16 个线程,其中主题 B 中的 14 个 ..

Kafka Streams:使用 Spring Cloud Stream 为每组主题定义多个 Kafka Streams

我正在尝试使用 Kafka Streams 做一个简单的 POC.但是,我在启动应用程序时遇到异常.我正在使用 Spring-Kafka、Kafka-Streams 2.5.1 和 Spring Boot 2.3.5Kafka流配置 @Configuration公共类 KafkaStreamsConfig {私有静态最终记录器日志 = LoggerFactory.getLogger(KafkaS ..

根据部分数据属性更新KTable

我正在尝试使用对象的部分数据更新 KTable.例如.用户对象是{"id":1, "name":"Joe", "age":28}对象被流式传输到一个主题中,并按密钥分组到 KTable 中.现在用户对象部分更新如下 {"id":1, "age":33} 并流式传输到表中.但更新后的表看起来如下 {"id":1, "name":null, "age":28}.预期输出为 {"id":1, "name ..

根据部分数据属性更新KTable

我正在尝试使用对象的部分数据更新 KTable.例如.用户对象是{"id":1, "name":"Joe", "age":28}对象被流式传输到一个主题中,并按密钥分组到 KTable 中.现在用户对象部分更新如下 {"id":1, "age":33} 并流式传输到表中.但更新后的表看起来如下 {"id":1, "name":null, "age":28}.预期输出为 {"id":1, "name ..

Spring Cloud Stream Kafka Streams Binder KafkaException:无法启动流:'listener'不能为空

我是 Kafka Streams 和 Spring Cloud Stream 的新手,但在将集成相关代码移动到属性文件方面阅读了有关它的好消息,因此开发人员可以主要关注事物的业务逻辑方面. 这里有我的简单应用程序类. package com.some.events.consumer导入 com.some.events.SomeEvent导入 org.apache.kafka.streams ..

当控制去捕获块时如何停止发送到 kafka 主题功能 kafka spring

你能告诉我如何停止发送到我的第三个 kafka 主题,当控件到达 catch 块时,当前消息被发送到错误主题以及在正常情况下应该发送到的主题加工.代码片段如下: @Component公共类 Abc {私有最终 StreamBridge streamBridge;公共 Abc (StreamBridge streamBridge)this.streamBridge = 流桥;@豆角,扁豆公共函数 ..

Confluent 模式注册表与 Spring 云模式注册表

我目前一直在研究 Spring Cloud 模式注册表和融合模式注册表.我可以看到一些差异,例如 spring 云模式注册表将模式保存在普通数据库中,默认情况下保存在 h2 中,而融合模式注册表保存在 kafka 主题中. 对于 Spring Cloud 模式注册表,这种方法是否有任何性能影响.据我所知,即使数据在融合的情况下保持在主题上,查询它仍然会延迟.但会产生重大影响吗? 我还看 ..

从 spring-cloud-stream 访问 kafka 生产者工厂

我有一个使用 spring-boot-cloud 和 apache-kafka 的项目,感谢 EmbeddedBroker,我有一个涵盖拓扑逻辑的集成测试列表. 我最近发现运行这些测试时日志中有很多噪音. 例如[Producer clientId=producer-2] 无法建立到节点 0 (localhost/127.0.0.1:63267) 的连接.经纪人可能不可用. 经过反 ..
发布时间:2021-11-12 03:17:28 其他开发

Confluent 模式注册表与 Spring 云模式注册表

我目前一直在研究 Spring Cloud 模式注册表和融合模式注册表.我可以看到一些差异,例如 spring 云模式注册表将模式保存在普通数据库中,默认情况下保存在 h2 中,而融合模式注册表保存在 kafka 主题中. 对于 Spring Cloud 模式注册表,这种方法是否有任何性能影响.据我所知,即使数据在融合的情况下保持在主题上,查询它仍然会延迟.但会产生重大影响吗? 我还看 ..

从 spring-cloud-stream 访问 kafka 生产者工厂

我有一个使用 spring-boot-cloud 和 apache-kafka 的项目,感谢 EmbeddedBroker,我有一个涵盖拓扑逻辑的集成测试列表. 我最近发现运行这些测试时日志中有很多噪音. 例如[Producer clientId=producer-2] 无法建立到节点 0 (localhost/127.0.0.1:63267) 的连接.经纪人可能不可用. 经过反 ..
发布时间:2021-11-12 03:14:39 其他开发

从 spring-cloud-stream 访问 kafka 生产者工厂

我有一个使用 spring-boot-cloud 和 apache-kafka 的项目,感谢 EmbeddedBroker,我有一个涵盖拓扑逻辑的集成测试列表. 我最近发现运行这些测试时日志中有很多噪音. 例如[Producer clientId=producer-2] 无法建立到节点 0 (localhost/127.0.0.1:63267) 的连接.经纪人可能不可用. 经过反 ..
发布时间:2021-11-12 03:11:58 其他开发

获取数据时出现意外错误代码 13 Spring-Cloud-Stream Kafka Azure 事件中心 -

它从 12 月 18 日开始突然出现这个问题弹出,找不到任何线索,在正常工作 6 个月后突然出现这个异常是什么原因. 我无法在本地独立复制,但它带有 微服务 的 docker 镜像,其中之一与 Azure 甚至集线器 通信.> 我搜索并找到了这个 发布.在这篇文章中还报告了我面临的同样问题,但找不到任何解决方案或线索,所以我发布了这个问题. 请参考下面的堆栈跟踪: 处理时出错:空 ..

Confluent 模式注册表与 Spring 云模式注册表

我目前一直在研究 Spring Cloud 模式注册表和融合模式注册表.我可以看到一些差异,例如 spring 云模式注册表将模式保存在普通数据库中,默认情况下保存在 h2 中,而融合模式注册表保存在 kafka 主题中. 对于 Spring Cloud 模式注册表,这种方法是否有任何性能影响.据我所知,即使数据在融合的情况下保持在主题上,查询它仍然会延迟.但会产生重大影响吗? 我还看 ..

如何配置 Spring Cloud Stream (Kafka) 应用程序以在 Confluent Cloud 中自动创建主题?

有没有办法让(Spring Cloud Stream)应用程序在 Confluent Cloud 中自动创建他们需要的主题? 到目前为止,我不得不手动创建它们,当您考虑还必须设置变更日志主题时,这很容易出错. 解决方案 binder 属性 autoCreateTopics 是 默认为true.因此应该自动创建主题(除非代理权限不允许 - 但我希望在这种情况下会在日志中看到错误). ..