spring-cloud-stream相关内容

将 Spring Cloud Stream 与 Kafka 一起使用时,如何正常关闭应用程序?

我有一个使用 Spring Cloud Stream (v1.3.0) 和 Kafka (v1.1.6) 的 Spring Boot (v.1.57) 应用程序.我希望能够优雅地关闭它,即在关闭时,所有流侦听器(即用 @StreamListener 注释)应该: 停止轮询新消息 完成工作 将偏移量提交给 Kafka 我注意到 ContainerProperties 中有一个名为“s ..
发布时间:2021-08-20 20:10:15 Java开发

如何在 Spring Cloud Stream 项目中将传入的标头映射为 String 而不是 byte[]?

我有一个简单的 Spring Cloud Stream 项目,它使用 Spring Integration DSL 流和 Kafka 绑定器.一切正常,但来自 Kafka 的消息头值以 byte[] 的形式到达. 这意味着我的 SI @Header 参数需要是 byte[] 类型.哪个有效,但最好将它们作为字符串(我关心的所有入站标头都是字符串值). 我已将 Kafka 客户端配置为使 ..

Spring Reactive Stream - 意外关闭

我们正在将 Spring Cloud Reactive Streams 与 RabbitMQ 结合使用. Spring Reactive Stream 似乎在将消息从队列中拉出时立即确认该消息.因此,在消息处理期间发生的任何错误未处理异常都需要在应用程序中进行处理(这与非反应流不同,非反应流可以抛出未处理的异常并拒绝消息,从而将其发送到死信队列). 当消息正在传输时,我们应该如何处理应 ..

使用 Spring-cloud-stream-kafka-stream 的活页夹问题

我正在尝试使用 spring 云流 kafka 流读取 kafka.然后我在一分钟的时间窗口内聚合事件并将其写入不同的主题.然后我需要从主题中读取聚合事件并将其写入不同的主题,同时将主题与另一个 kafka 集群中的不同主题绑定.但我得到了以下活页夹异常. org.springframework.context.ApplicationContextException: 无法启动 bean 'ou ..
发布时间:2021-08-20 20:10:03 其他开发

spring-cloud-stream 消息转换异常

在将我们的一项服务升级到 spring-cloud-stream 2.0.0.RC3 时,我们在尝试使用由使用旧版本 spring-cloud-stream - Ditmars.RELEASE: 错误 31241 --- [container-4-C-1] osintegration.handler.LoggingHandler:org.springframework.messaging.c ..
发布时间:2021-08-20 20:09:57 其他开发

spring.cloud.stream.kafka.binder.headers 未按预期工作

我正在尝试使用 spring.cloud.stream.kafka.binder.headers 来传输我根据之前的 问题. 我已阅读文档 哪里... spring.cloud.stream.kafka.binder.headers将由绑定器传输的自定义标头列表.默认值:空. 似乎暗示设置列表(逗号分隔?)会导致自定义标头在 Message 中传输,但是一旦 kafka 写入完成,标头就 ..
发布时间:2021-08-20 20:09:51 其他开发

spring-cloud-stream 请求-回复消息传递模式

是否有一种应与 spring-cloud-stream 一起使用的请求-回复模式?我可以在 spring-cloud-stream 上找到的所有文档都针对 MessageChannel.send 一劳永逸类型的生产者,我熟悉 spring-integration 中的 @MessagingGateway,但我不确定那会如何使用 spring-cloud-stream.当您有一个 REST POST ..
发布时间:2021-08-20 20:09:45 其他开发

LoadBalancing Spring 云数据流服务器

在 spring 云数据流中,根据我的理解,每个流都是一个微服务,但数据流服务器不是.我说得对吗? Spring Cloud Dataflow(SCDF) 服务器是否可以有多个实例?如何对数据流服务器进行负载平衡?我打算在 AWS 中部署它.官方文档没有提到有关数据流服务器负载平衡的任何内容.如果可能的话,仪表板、外壳如何工作? 解决方案 SCDF-server 是一个常规的 Spr ..

使用原始标头发布 null/tombstone 消息

我正在构建一个 Spring Cloud Stream Kafka 处理器应用程序,它将使用字符串键使用原始数据,有时使用来自 Kafka 主题的空负载.我想为另一个主题生成一个字符串键和空负载(在 Kafka 中称为墓碑).为了在消息上使用原始标头,我需要输出一个 byte[],但是如果我将 KafkaNull.INSTANCE 编码成一个 byte[]code> 它将逐字输出对象哈希码的字符串 ..

更改 RabbitMQ Spring Cloud Stream Starter App 的内容类型

Spring Cloud Stream Starter Apps 列出了几种可能的内容类型,每种类型的输出负载都有不同的结果类型.但是,它没有说明如何选择要使用的那个.我正在部署一个 Spring Cloud 数据流,将 Rabbit 源连接到一个日志接收器,我得到的只是字节数组.即使我在 Rabbit 消息的标头中将内容类型明确设置为“text/plain",它也会在日志接收器中显示为带有标头的 ..
发布时间:2021-08-20 20:08:47 其他开发

在 Spring Cloud Data Flow 中使用一个源、两个并行处理器和一个接收器创建 Stream

我正在尝试使用 在 Spring Cloud Data Flow 中创建一个流 一个来源,即订单来源和 Order 消息将发布到 RabbitMQ Topic/Queue. 两个并行处理器,即产品处理器和装运处理器这两个处理器都将是RabbitMQ Topic/Queue的订阅者并获取Order消息,每个处理器将单独处理这些Order消息并更新Order,Order消息将发布到Rab ..
发布时间:2021-08-20 20:08:41 其他开发

使用 Spring Cloud Dataflow 基于内容的路由

我需要使用 Spring Cloud Dataflow 实现基于内容的路由和过滤. 例如如果我的 processor 的输入是一个字符串.如果字符串包含 say xyz 我需要将它传递给我的 Spring Cloud Stream 中的组件/步骤 X;否则我希望它转到我的流的组件/步骤 Y. 我知道可以使用 router-sink 来实现相同的功能;但是我想使用路由\决策作为 proc ..
发布时间:2021-08-20 20:08:39 其他开发

Spring Cloud 数据流中的 Kafka 源

我正在从 Spring XD 迁移到 Spring Cloud Data Flow.当我在寻找模块列表时,我意识到 Spring Cloud Flow 中没有列出一些源——其中之一是 KAFKA 源. 我的问题是为什么 KAFKA 源从 Spring Cloud 数据流中的标准源列表中删除? 解决方案 当我在寻找模块列表时,我发现一些源没有在 Spring Cloud Flow ..

Spring Cloud Dataflow 自定义应用程序卡在部署状态

我创建了一个自定义 Spring Cloud 流处理器应用程序,并将其部署为 Source|Processor|Sink 流中的处理器步骤.一切似乎都运行良好,但我的自定义应用程序在数据流 UI 中显示“正在部署".如果这会影响任何事情,我会将它部署为来自 mavenLocal 的 SNAPSHOT.我是否遗漏了什么让 SCDF 知道部署成功? 解决方案 在这种情况下,通常的罪魁祸首是 B ..
发布时间:2021-08-20 20:08:15 其他开发

用于 HTTP 请求/响应交换的 Spring Cloud DataFlow

我想使用流来处理 HTTP 请求/响应交换.我没有看到任何具有 HTTP 接收器功能的 Spring Cloud Stream App Starters.我是否需要构建一个自定义接收器来处理响应?如果是这样,我是否通过我的处理管道传递请求,然后使用我的接收器中的请求来形成响应?我认为我没有误解 Spring Cloud DataFlow 和 Spring Cloud Stream 的用例.也许有适 ..
发布时间:2021-08-20 20:08:12 其他开发