spring-cloud-stream相关内容

Spring Cloud Kafka:当两个处理器处于活动状态时,无法序列化输出流的数据

我有一个具有函数式编程风格的 Spring Cloud Kafka Streams 的工作设置.有两个用例,通过 application.properties 进行配置.它们都单独工作,但是一旦我同时激活它们,我就会收到第二个用例的输出流的序列化错误: 线程异常“ActivitiesAppId-05296224-5ea1-412a-aee4-1165870b5c75-StreamThread-1 ..
发布时间:2021-11-12 03:03:49 Java开发

如何在 spring-cloud-stream 中的 kafka 进程拓扑中使用交互式查询?

是否可以在 Spring Cloud Stream 中使用带有 @EnableBinding 批注的类或在带有 @StreamListener 的方法中使用交互式查询 (InteractiveQueryService)?我尝试在提供的 KStreamMusicSampleApplication 类和处理方法,但始终为空. 我的@StreamListener 方法正在侦听一堆 KTable 和 ..

当目标系统关闭时停止 Spring Cloud Stream @StreamListener 的监听

我有一个应用程序,它从 Kafka 获取消息并调用目标系统来更新旧的 Oracle 数据库. 我想启用一个场景,如果目标系统停机,将消息留在 Kafka 总线上,并且在给定的时间段内不处理它们.我正在考虑一些基于断路器 Hystrix 的解决方案,但我找不到任何机制来告诉 Spring Cloud Stream“停止"事件侦听.我能想到的唯一另一种选择是,如果断路器打开,将这些消息传输到错误 ..

将 Spring Cloud Stream 与 Kafka 一起使用时,如何正常关闭应用程序?

我有一个使用 Spring Cloud Stream (v1.3.0) 和 Kafka (v1.1.6) 的 Spring Boot (v.1.57) 应用程序.我希望能够优雅地关闭它,即在关闭时,所有流侦听器(即用 @StreamListener 注释)应该: 停止轮询新消息 完成工作 将偏移量提交给 Kafka 我注意到 ContainerProperties 中有一个名为“s ..
发布时间:2021-11-12 02:47:10 Java开发

在 spring-cloud-stream kafka binder 中接受二进制 json 消息的属性是什么

我正在使用 spring-cloud-stream kafka 绑定器来使用来自 kafka 主题的消息.源系统以 ascii 格式发送 json 消息.当我的消费者收听它抛出的主题时 o.s.c.s.b.k.KafkaMessageChannelBinder:无法转换消息:7B22736.. 我可以在 .yml 文件中设置任何属性来反序列化它吗?或者有没有我可以研究的例子? 解决方案 ..
发布时间:2021-11-12 02:41:24 其他开发

Spring Cloud Streams 应用程序抛出嵌套异常是域模型类的 java.lang.ClassCastException

嗨,我正在为 Kafka 试用最新的 Spring 云流框架.但是,对于 String 和 Double 其工作正常,但是当我尝试发送 Java POJO 类时,它会引发以下异常. 我尝试了各种用于序列化和反序列化的配置,但似乎没有任何效果.我能够以 json 格式从供应商处生成消息,但由于错误,消费者无法处理它. 对此问题的任何建议将不胜感激.谢谢 org.springframew ..

Kafka 消息被重新处理

我们有一个微服务,它使用 spring-boot 和 spring-cloud-stream 从 Kafka 生成和使用消息. 版本: 弹簧引导:1.5.8.RELEASE spring-cloud-stream:Ditmars.RELEASE 卡夫卡服务器:kafka_2.11-1.0.0 编辑:我们正在使用 3 个 Kafka 节点的 StatefulSets 集群和 3 个 Z ..
发布时间:2021-11-12 02:38:40 其他开发

Spring Cloud Stream Kafka Streams Binder KafkaException:无法启动流:'listener'不能为空

我是 Kafka Streams 和 Spring Cloud Stream 的新手,但在将集成相关代码移动到属性文件方面阅读了有关它的好消息,因此开发人员可以主要关注事物的业务逻辑方面. 这里有我的简单应用程序类. package com.some.events.consumer导入 com.some.events.SomeEvent导入 org.apache.kafka.streams ..

在 Spring Cloud Stream Kafka 中正确管理 DLQ

我想使用 kafka 在 Spring Cloud Stream 中管理一个 DLQ. application.yaml 服务器:端口:8091尤里卡:客户:服务网址:默认区域:http://IP:8761/eureka春天:应用:名称:员工-消费者云:溪流:卡夫卡:粘合剂:经纪人:IP:9092绑定:问候语:目的地:问候内容类型:应用程序/json问候:目的地:问候内容类型:应用程序/j ..
发布时间:2021-11-12 02:29:40 其他开发

手动确认消息:Spring Cloud Stream Kafka

我想要实现的场景是消费来自 Kafka 的消息,处理它,如果某些条件失败,我不想确认该消息.为此,我在 spring 云流参考文档中找到了, 自动提交偏移量处理消息后是否自动提交偏移量.如果设置为 false,则消息头中将提供一个确认头用于延迟确认. 默认值:true. 我的问题是将 autoCommitOffset 设置为 false 后,我如何确认消息?非常感谢代码示例. ..
发布时间:2021-11-12 02:22:30 其他开发

使用 Spring Cloud Stream Kafka 3.0.3.RELEASE 时可以正常关闭吗?

我基于 Spring Cloud Stream 3.0.3.RELEASE 开发了 kafka 消费者应用程序.(SpringBoot 2.3.4.RELEASE) 当我停止这个应用程序时,我希望消费者优雅地关闭.类似问题 停止轮询新消息 完成他们的工作并将偏移量提交给 Kafka 正常关闭应用程序 spring cloud 流默认这个能用吗?那有没有相关的文档? 供您 ..
发布时间:2021-11-12 02:05:03 其他开发

如何使用 Spring Cloud Stream Kafka 和每个服务的数据库实现微服务事件驱动架构

我正在尝试实现一个事件驱动架构来处理分布式事务.每个服务都有自己的数据库,并使用 Kafka 发送消息通知其他微服务有关操作. 示例: 订购服务 ------->|卡夫卡|------->支付服务||订单 MariaDB DB 付款 MariaDB 数据库 订单收到订单请求.它必须将新订单存储在其数据库中并发布一条消息,以便支付服务意识到它必须为该项目收费: 私人订单业务订单业务 ..

Spring Cloud 流 kafka 暂停/恢复活页夹

我们使用的是 spring cloude stream 2.0 &Kafka 作为消息代理. 对于目标系统(数据库或第 3 方 API)不可用的情况,我们已经实现了一个可以停止应用程序上下文的断路器,如下所示:当目标系统关闭时停止Spring Cloud Stream @StreamListener 监听 现在在 spring cloud stream 2.0 中有一种方法可以使用执行器来 ..

当控制去捕获块时如何停止发送到 kafka 主题功能 kafka spring

你能告诉我如何停止发送到我的第三个 kafka 主题,当控件到达 catch 块时,当前消息被发送到错误主题以及在正常情况下应该发送到的主题加工.代码片段如下: @Component公共类 Abc {私有最终 StreamBridge streamBridge;公共 Abc (StreamBridge streamBridge)this.streamBridge = 流桥;@豆角,扁豆公共函数 ..

通过Rabbit DLQ延迟重新排队

我正在尝试按照 这里. 我尝试尽可能接近地从文档中复制配置示例,但创建的死信队列本身并未绑定回 DLX.我不清楚为什么不. 虽然我看到了另一个潜在的解决方案,但我没有依赖默认行为,而是尝试显式设置 dlqDeadLetterExchange 和 dlqDeadLetterRoutingKey 属性,以查看是否可以使其工作.我的配置是这样的: 兔子:绑定:输入:消费者:autoBind ..
发布时间:2021-08-20 20:11:36 其他开发