spring-cloud-stream相关内容
我有一个具有函数式编程风格的 Spring Cloud Kafka Streams 的工作设置.有两个用例,通过 application.properties 进行配置.它们都单独工作,但是一旦我同时激活它们,我就会收到第二个用例的输出流的序列化错误: 线程异常“ActivitiesAppId-05296224-5ea1-412a-aee4-1165870b5c75-StreamThread-1
..
是否可以在 Spring Cloud Stream 中使用带有 @EnableBinding 批注的类或在带有 @StreamListener 的方法中使用交互式查询 (InteractiveQueryService)?我尝试在提供的 KStreamMusicSampleApplication 类和处理方法,但始终为空. 我的@StreamListener 方法正在侦听一堆 KTable 和
..
我有一个应用程序,它从 Kafka 获取消息并调用目标系统来更新旧的 Oracle 数据库. 我想启用一个场景,如果目标系统停机,将消息留在 Kafka 总线上,并且在给定的时间段内不处理它们.我正在考虑一些基于断路器 Hystrix 的解决方案,但我找不到任何机制来告诉 Spring Cloud Stream“停止"事件侦听.我能想到的唯一另一种选择是,如果断路器打开,将这些消息传输到错误
..
我有一个使用 Spring Cloud Stream (v1.3.0) 和 Kafka (v1.1.6) 的 Spring Boot (v.1.57) 应用程序.我希望能够优雅地关闭它,即在关闭时,所有流侦听器(即用 @StreamListener 注释)应该: 停止轮询新消息 完成工作 将偏移量提交给 Kafka 我注意到 ContainerProperties 中有一个名为“s
..
我正在使用 spring-cloud-stream kafka 绑定器来使用来自 kafka 主题的消息.源系统以 ascii 格式发送 json 消息.当我的消费者收听它抛出的主题时 o.s.c.s.b.k.KafkaMessageChannelBinder:无法转换消息:7B22736.. 我可以在 .yml 文件中设置任何属性来反序列化它吗?或者有没有我可以研究的例子? 解决方案
..
嗨,我正在为 Kafka 试用最新的 Spring 云流框架.但是,对于 String 和 Double 其工作正常,但是当我尝试发送 Java POJO 类时,它会引发以下异常. 我尝试了各种用于序列化和反序列化的配置,但似乎没有任何效果.我能够以 json 格式从供应商处生成消息,但由于错误,消费者无法处理它. 对此问题的任何建议将不胜感激.谢谢 org.springframew
..
我们有一个微服务,它使用 spring-boot 和 spring-cloud-stream 从 Kafka 生成和使用消息. 版本: 弹簧引导:1.5.8.RELEASE spring-cloud-stream:Ditmars.RELEASE 卡夫卡服务器:kafka_2.11-1.0.0 编辑:我们正在使用 3 个 Kafka 节点的 StatefulSets 集群和 3 个 Z
..
我在使用 Spring Cloud Stream 时遇到问题.问题是我有一个 bean,它会在创建后立即写入 Kafka(使用 @PostConstruct 注释的方法),因此我自动装配了适当的 MessageChannel 并在 application.yml 中设置了目标和绑定器属性.它是这样的: @Component@RequiredArgsConstructor公开课发件人{私有最终 M
..
我在使用 Spring Cloud Stream 时遇到问题.问题是我有一个 bean,它会在创建后立即写入 Kafka(使用 @PostConstruct 注释的方法),因此我自动装配了适当的 MessageChannel 并在 application.yml 中设置了目标和绑定器属性.它是这样的: @Component@RequiredArgsConstructor公开课发件人{私有最终 M
..
我是 Kafka Streams 和 Spring Cloud Stream 的新手,但在将集成相关代码移动到属性文件方面阅读了有关它的好消息,因此开发人员可以主要关注事物的业务逻辑方面. 这里有我的简单应用程序类. package com.some.events.consumer导入 com.some.events.SomeEvent导入 org.apache.kafka.streams
..
我想使用 kafka 在 Spring Cloud Stream 中管理一个 DLQ. application.yaml 服务器:端口:8091尤里卡:客户:服务网址:默认区域:http://IP:8761/eureka春天:应用:名称:员工-消费者云:溪流:卡夫卡:粘合剂:经纪人:IP:9092绑定:问候语:目的地:问候内容类型:应用程序/json问候:目的地:问候内容类型:应用程序/j
..
我想要实现的场景是消费来自 Kafka 的消息,处理它,如果某些条件失败,我不想确认该消息.为此,我在 spring 云流参考文档中找到了, 自动提交偏移量处理消息后是否自动提交偏移量.如果设置为 false,则消息头中将提供一个确认头用于延迟确认. 默认值:true. 我的问题是将 autoCommitOffset 设置为 false 后,我如何确认消息?非常感谢代码示例.
..
我正在尝试设置一个 Spring Boot 应用程序来监听 Kafka. 我正在使用 Kafka Streams Binder. 用一个简单的@EnableBinding @EnableBinding(StreamExample.StreamProcessor.class)公共类流示例{@StreamListener(StreamProcessor.INPUT)@SendTo(Str
..
我基于 Spring Cloud Stream 3.0.3.RELEASE 开发了 kafka 消费者应用程序.(SpringBoot 2.3.4.RELEASE) 当我停止这个应用程序时,我希望消费者优雅地关闭.类似问题 停止轮询新消息 完成他们的工作并将偏移量提交给 Kafka 正常关闭应用程序 spring cloud 流默认这个能用吗?那有没有相关的文档? 供您
..
我正在尝试实现一个事件驱动架构来处理分布式事务.每个服务都有自己的数据库,并使用 Kafka 发送消息通知其他微服务有关操作. 示例: 订购服务 ------->|卡夫卡|------->支付服务||订单 MariaDB DB 付款 MariaDB 数据库 订单收到订单请求.它必须将新订单存储在其数据库中并发布一条消息,以便支付服务意识到它必须为该项目收费: 私人订单业务订单业务
..
我们使用的是 spring cloude stream 2.0 &Kafka 作为消息代理. 对于目标系统(数据库或第 3 方 API)不可用的情况,我们已经实现了一个可以停止应用程序上下文的断路器,如下所示:当目标系统关闭时停止Spring Cloud Stream @StreamListener 监听 现在在 spring cloud stream 2.0 中有一种方法可以使用执行器来
..
你能告诉我如何停止发送到我的第三个 kafka 主题,当控件到达 catch 块时,当前消息被发送到错误主题以及在正常情况下应该发送到的主题加工.代码片段如下: @Component公共类 Abc {私有最终 StreamBridge streamBridge;公共 Abc (StreamBridge streamBridge)this.streamBridge = 流桥;@豆角,扁豆公共函数
..
当从方法返回值生成消息时,有没有办法配置默认的Message标头: @Publisher(channel = "theChannelname")公共 MyObject someMethod(Object param) {...返回我的对象;} 或 @SendTo("theChannelname")公共 MyObject someMethod(Object param) {...返回我的对
..
首先,感谢 Spring 团队为推动这项工作所做的所有工作! 现在Camden.SR5正式版了,我有一些问题在使用Spring Boot 1.5.1时如何正确配置spring cloud stream kafka binder. Spring Boot 1.5.1 对 kafka 进行了自动配置,而这些配置选项似乎与 Spring Cloud Stream kafka binder 中
..
我正在尝试按照 这里. 我尝试尽可能接近地从文档中复制配置示例,但创建的死信队列本身并未绑定回 DLX.我不清楚为什么不. 虽然我看到了另一个潜在的解决方案,但我没有依赖默认行为,而是尝试显式设置 dlqDeadLetterExchange 和 dlqDeadLetterRoutingKey 属性,以查看是否可以使其工作.我的配置是这样的: 兔子:绑定:输入:消费者:autoBind
..