spring-cloud-stream相关内容

Spring Cloud Stream Kafka - 方法必须是声明性的

我已经使用 spring cloud stream 配置了一个基于 spring boot 的应用程序.我正在尝试使用 KStream,但我不断收到错误“java.lang.IllegalArgumentException:方法必须是声明性的".有人可以帮助我了解如何进行配置吗?我查看了 StreamListener 文档,但无法使其正常工作. https://docs.spring.io/ ..
发布时间:2021-08-20 18:58:28 其他开发

延迟处理死信队列

我想执行以下操作:当消息失败并落入我的死信队列时,我想等待 5 分钟并在我的队列中重新发布相同的消息. 今天,使用Spring Cloud Streams和RabbitMQ,我做了如下代码基于本文档: @Component公共类 HandlerDlq {私有静态最终记录器 LOGGER = LoggerFactory.getLogger(HandlerDlq.class);private ..
发布时间:2021-07-02 18:42:33 Java开发

如何查找绑定到部署在 Tanzu (Pivotal/PCF) 环境上的 SCDF 流的 RabbitMQ 服务器的连接信息?

这是当响应来自使用 Spring Integration DSL 的 rabbitMQ 回复队列时,如何实现 HTTP 请求/回复?. 我们能够在本地成功构建 Spring Integration 应用程序和 SCDF 流.我们可以向绑定到 SCDF 流兔源的 rabbitMQ 请求队列发送一个 http 请求.我们还可以从绑定到 SCDF 流兔接收器的 rabbitMQ 响应队列接收响应. ..

一个 SCDF 源,2 个处理器,但每个项目只有 1 个进程

我的用例是对此的一个变体: 创建Spring Cloud Data Flow 中的一个源、两个并行处理器和一个接收器的流 在示例中,1 个源向 rabbitmq 发出一个项目,两个处理器都得到它. 我想要相反的.我希望源将项目发送到 rabbitmq 但只有 1 个处理器处理每个项目. 假设我有: 1 个名为 source 的源2 个处理器名为 processor1 ..
发布时间:2021-07-02 18:41:50 其他开发

Spring Cloud Stream 中的事务

问题:我正在尝试逐行读取一个大文件并将消息放入 RabbitMQ.我想在文件末尾提交rabbitMQ.如果文件中有任何记录是坏的,那么我想撤消发布到队列中的消息. 技术:弹簧靴,春云流,兔MQ 你能帮我实现这个过渡的东西吗?我知道如何使用 Spring Cloud 流读取文件并发布到队列. 编辑: @Transactionalpublic void sendToQueue(L ..

RabbitMQ 本地替代方案

我正在提供新的软件设计,并将使用 RabbitMQ 作为消息总线以及 Spring Cloud Stream. 其中一个问题是开发者无法在本地机器上安装 Erlang 或 RabbitMQ. Spring Cloud 流不支持 ActiveMQ.无论如何,我可以在本地机器上安装队列并在 unix 环境中使用 RabbitMQ 时使用它吗? 解决方案 一个不错的选择是在本地机器上 ..
发布时间:2021-07-02 18:40:18 其他开发

当响应来自使用 Spring Integration DSL 的 rabbitMQ 回复队列时,如何实现 HTTP 请求/回复?

我正在尝试在 Spring Integration DSL 中使用单独的 RabbitMQ 队列来实现 HTTP 请求/回复.它类似于 Spring IntegrationFlow http 请求到 amqp 队列.不同之处在于我希望将响应返回给原始的 http 调用方.我可以看到测试 http post 消息成功传递到请求队列并转换(为大写)到响应队列.该消息也从响应队列中使用,但从未返回给调用 ..

Kafka Streams:使用Spring Cloud Stream为每组主题定义多个Kafka Streams

我正在尝试使用Kafka Streams做一个简单的POC.但是,启动应用程序时出现异常.我正在使用Spring Boot 2.3.5的Spring-Kafka,Kafka-Streams 2.5.1Kafka流配置 @Configuration公共类KafkaStreamsConfig {私有静态最终Logger日志= LoggerFactory.getLogger(KafkaStreams ..

Spring Cloud Kafka:当两个处理器处于活动状态时,无法序列化输出流的数据

我具有功能编程风格的Spring Cloud Kafka Streams的有效设置.有两种使用案例,它们是通过 application.properties 配置的.它们都可以单独工作,但是一旦我同时激活它们,就会收到第二个用例的输出流的序列化错误: 线程"ActivitiesAppId-05296224-5ea1-412a-aee4-1165870b5c75-StreamThread-1"中的 ..
发布时间:2021-04-08 19:05:40 Java开发

spring-cloud-stream-kafka活页夹中接受二进制json消息的属性是什么

我正在使用spring-cloud-stream kafka活页夹来使用来自kafka主题的消息.源系统正在以ascii发送json消息.当我的消费者收听主题时,它会引发 o.s.c.s.b.k.KafkaMessageChannelBinder:无法转换消息:7B22736 .. .yml文件中是否可以设置任何属性以反序列化它?还是有一个我可以研究的例子? 解决方案 当我添加 c ..
发布时间:2021-04-08 18:55:32 其他开发

Kafka消息被重新处理

我们有一个微服务,它使用spring-boot和spring-cloud-stream产生和使用来自Kafka的消息. 版本: 弹簧靴:1.5.8.RELEASE spring-cloud-stream:Ditmars.RELEASE Kafka服务器:kafka_2.11-1.0.0 编辑:我们正在使用3个Kafka节点的StatefulSets集群和3个Zookeeper节点的集 ..
发布时间:2021-04-08 18:55:22 其他开发

@PostConstruct并自动装配MessageChannel

我在Spring Cloud Stream上遇到问题.事实是,我有一个可在创建Kafka后立即将其写入Kafka(使用@PostConstruct注释的方法)的Bean,因此我自动连接了适当的MessageChannel并在application.yml中设置了目标和绑定器属性.它是这样的: @Component@RequiredArgsConstructor公共类发件人{私有的最终Messa ..
发布时间:2021-04-08 18:54:53 其他开发

手动确认消息:Spring Cloud Stream Kafka

我要实现的方案是消耗来自Kafka的消息,对其进行处理,如果某些条件失败,我不希望确认该消息.为此,我在Spring Cloud Stream参考文档中找到了 autoCommitOffset处理消息后是否自动提交偏移量.如果设置为false,则消息头中将提供一个Acknowledgment标头,以供以后确认. 默认:true. 我的问题是将autoCommitOffset设置为 ..
发布时间:2021-04-08 18:51:17 其他开发