spring-cloud-stream相关内容

带有Avro的Spring Cloud Stream无法正确转换字符串消息

我遇到一个问题,源发送了一个GenericMessage[有效负载=xxxxx,...]而接收器接收的消息为10,120,120,120,120,120。 此问题发生在我设置Avro消息转换器之后。如果我删除Avro消息转换器并使用StreamListener处理消息转换,它将工作得很好。 源应用程序.属性 spring.cloud.stream.bindings.toGreet ..
发布时间:2022-09-24 21:23:50 其他开发

基于APACHE-KAFKA-BINDER的春云流功能模型

这是question的续集。我可以把“普通”的阿帕奇卡夫卡活页夹和功能模型一起使用吗?到目前为止,使用基于注释的配置,我在一个应用程序中混合了spring-cloud-stream-binder-kafka和spring-cloud-stream-binder-kafka-streams两者,spring-cloud-stream-binder-kafka用于简单的使用/生产和spring-clo ..

PolledProcessor在Spring云数据流上的问题

我正在使用PolledProcessor实现一个Spring云数据流处理器。我遵循了这里的示例https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers。以下是我的代码。我将一个带有源管道的流部署到SCDF的这个处理器(源|轮询处理器),并让源发布了一些消息。我确认处理器每秒轮询来自SCDF Rabbitmq ..
发布时间:2022-09-24 20:37:37 其他开发

如何在Spring Cloud Stream中添加或调整文件供应商的配置

Spring Cloud Stream的文件-供应商依赖包括FileSupplierConfiguration,它配置一个文件消息源(来自Spring集成)。我需要定制这个@配置提供的FileReadingMessageSource,但我不确定最好的方法。它仅提供了控制其@Bean%s的基本属性,但我需要更多。 我考虑将FileSupplierConfiguration从应用程序中排除,但之 ..
发布时间:2022-09-24 20:30:09 其他开发

利用Spring Cloud数据流搭建异质系统场景下的数据交换

我有一个涉及用不同语言(Java、Python、C#等)编写的多个应用程序的用例,我想在这些应用程序之间创建几个流。下面给出了一个说明性的例子。 我正在尝试通过一个编排层找到完成此任务的最佳方法,该编排层完成所有幕后工作,包括记账、部署、消息队列创建和绑定等。 为此,我正在研究Spring Cloud数据流(SCDF)。然而,SCDF被设计为专门为Spring引导应用程序创建数据集成管 ..
发布时间:2022-09-24 20:25:50 其他开发

如何使用Spring Cloud Kafka Stream 3.1创建制片人

我知道如何使用命令式编程方法定义生产者,但我找不到如何使用函数式编程方法定义生产者。 我读了关于这个的Spring Cloud Stream Binder文档,但只找到了如何定义消费者,或者消费者和生产者(例如,从主题中获取信息,转换数据并发送到另一个主题)。 所以,我不知道是否可以继续使用像@Input、@Ouptut这样的批注来定义单个处理器,在这一点上我非常困惑,因为库表明这些批 ..
发布时间:2022-09-24 20:20:18 Java开发

当产生的消息是消费者没有引用的模型类型时,如何反序列化消费者中的消息?

我正在尝试反序列化从Conumers作用域之外的服务生成的消息。 如果序列化/反序列化模型包不同,则Kafka模型不在受信任的包中。会出现错误 所以我想我有两个选择。或者以json/字符串的形式生成消息,或者让消费者包含生产者库并使用相同的模型来反序列化消息。 我想在消费者中反序列化时手动忽略消息的标头类型,但我未能做到这一点。有没有办法做到这一点,或者我是否坚持使用上述两个选项? ..
发布时间:2022-05-06 18:32:36 其他开发

Spring Cloud Kafka Streams中的错误处理

我使用的是Spring Cloud Stream和Kafka Stream。假设我有一个处理器,它是一个将字符串的KStream转换为CityProgrammes的KStream的函数。它调用一个API来按名称查找City,并调用另一个转换来查找该城市附近的任何事件。 现在的问题是转换过程中发生任何错误,整个应用程序都会停止。我想把这一条特别的信息发送给DLQ,然后继续前进。我已经读了几天了 ..

使用Kafka Streams绑定器和Function Style Processor处理Spring Cloud Streams中的异常处理示例

我将Spring Cloud Streams与Kafka Streams绑定器一起使用,功能 样式处理器API和多处理器。 以这种方式配置一个具有多个处理器和多个Kafka主题的处理应用程序,并使用/Actuator、WebClient等方式保持在Spring Boot领域,这真的很酷。事实上,我更喜欢它,而不是使用纯阿帕奇Kafka Streams。 但是:我希望为处理器内发生的异常 ..

Spring Aws Kinesis Binder ProvisionedThroughputExceededException,同时在批处理模式下消费消息

我正在使用批处理模式从 kinesis 流中提取记录.我们正在使用 spring aws kinesis binder. 大多数时候我们无法从流中提取消息.只有某些时候我们能够从流中提取消息. 我的配置如下所示 我的配置 弹簧:云:溪流:运动:粘合剂:锁:租期:30读取容量:1写入容量:1检查点:读取容量:1写入容量:1绑定:InStreamGroupOne:消费者:liste ..

Spring Aws Kinesis Binder ProvisionedThroughputExceededException,同时在批处理模式下使用消息

我正在使用批处理模式从 kinesis 流中提取记录.我们正在使用 spring aws kinesis binder. 大多数时候我们无法从流中提取消息.只有有时我们能够从流中提取消息. 我的配置如下 我的配置 弹簧:云:溪流:运动:粘合剂:锁:租期:30读取容量:1写入容量:1检查点:读取容量:1写入容量:1绑定:内流组一:消费者:侦听器模式:批处理idleBetweenP ..

是否可以使用 Spring Cloud Stream 进行一次处理?

目前我正在使用 SCS 和几乎默认配置的微服务之间发送和接收消息.不知怎的,我读过这个 https://www.confluent.io/blog/enabling-exactly-kafka-流 并想知道如果我们只是通过 Spring Boot 应用程序中的属性将名为“processing.guarantee"且值为“exactly-once"的属性放在那里,它会起作用吗? 解决方 ..