spring-cloud-stream-binder-kafka相关内容

基于APACHE-KAFKA-BINDER的春云流功能模型

这是question的续集。我可以把“普通”的阿帕奇卡夫卡活页夹和功能模型一起使用吗?到目前为止,使用基于注释的配置,我在一个应用程序中混合了spring-cloud-stream-binder-kafka和spring-cloud-stream-binder-kafka-streams两者,spring-cloud-stream-binder-kafka用于简单的使用/生产和spring-clo ..

使用Kafka Streams绑定器和Function Style Processor处理Spring Cloud Streams中的异常处理示例

我将Spring Cloud Streams与Kafka Streams绑定器一起使用,功能 样式处理器API和多处理器。 以这种方式配置一个具有多个处理器和多个Kafka主题的处理应用程序,并使用/Actuator、WebClient等方式保持在Spring Boot领域,这真的很酷。事实上,我更喜欢它,而不是使用纯阿帕奇Kafka Streams。 但是:我希望为处理器内发生的异常 ..

如果在处理步骤中发生故障,如何使 Spring Cloud 流 Kafka 流绑定器重试处理消息?

我正在使用 Spring Cloud Stream 处理 Kafka Streams.在消息处理应用程序中,有可能会产生错误.所以消息不应该被提交并再次重试. 我的申请方法- @Beanpublic Function, KStream>过程() {返回(输入)->{KStreamkt = input.flatMap ..

Spring Cloud @StreamListener 条件已弃用什么是替代方案

我们有多个应用程序消费者监听同一个 kafka 主题,生产者在向主题发送消息时设置消息头,以便特定实例可以评估头并处理消息.例如 @StreamListener(target=ITestSink.CHANNEL_NAME,condition=“headers['franchiseName'] == 'sydney'")public void fullfillOrder(@Payload Test ..

num.stream.threads 创建空闲线程

我有一个带有 2 个主题的 Spring Boot kafka 流应用程序,考虑主题 A 和 B.主题 A 有 16 个分区,主题 B 有 1 个分区.考虑将应用程序部署在具有 num.stream.threads=16.我运行 kafka-consumer-groups.bat 命令来检查线程如何分配给组中的分区,得到以下输出.主题 A 和 B 分配了 16 个线程,其中主题 B 中的 14 个 ..

Kafka Streams:使用 Spring Cloud Stream 为每组主题定义多个 Kafka Streams

我正在尝试使用 Kafka Streams 做一个简单的 POC.但是,我在启动应用程序时遇到异常.我正在使用 Spring-Kafka、Kafka-Streams 2.5.1 和 Spring Boot 2.3.5Kafka流配置 @Configuration公共类 KafkaStreamsConfig {私有静态最终记录器日志 = LoggerFactory.getLogger(KafkaS ..

如何配置 Spring Cloud Stream (Kafka) 应用程序以在 Confluent Cloud 中自动创建主题?

有没有办法让(Spring Cloud Stream)应用程序在 Confluent Cloud 中自动创建他们需要的主题? 到目前为止,我不得不手动创建它们,当您考虑还必须设置变更日志主题时,这很容易出错. 解决方案 binder 属性 autoCreateTopics 是 默认为true.因此应该自动创建主题(除非代理权限不允许 - 但我希望在这种情况下会在日志中看到错误). ..

Spring Cloud Streams 应用程序抛出嵌套异常是域模型类的 java.lang.ClassCastException

嗨,我正在为 Kafka 试用最新的 Spring 云流框架.但是,对于 String 和 Double 其工作正常,但是当我尝试发送 Java POJO 类时,它会引发以下异常. 我尝试了各种用于序列化和反序列化的配置,但似乎没有任何效果.我能够以 json 格式从供应商处生成消息,但由于错误,消费者无法处理它. 对此问题的任何建议将不胜感激.谢谢 org.springframew ..

Spring Cloud Stream kafka 事务配置

我正在关注这个模板用于Spring-cloud-stream-kafka 但在使生产者方法 transactional 时卡住了.我之前没有使用过 kafka 所以需要帮助以防 kafka 中需要任何配置更改 如果没有添加事务配置它运行良好,但是当添加事务配置时它在启动时超时 - 2020-11-21 15:07:55.349 ERROR 20432 --- [main] o.s.c.s.b ..

使用 Spring Cloud Stream 无法在消费者端设置到同一目的地和组的多个绑定

我打算为 Kafka 消费者弹性设置多个绑定.更具体地说,备份侦听器与主侦听器具有相同的目标和组,期望代理 IP.备份侦听器的自动启动在启动时关闭,但会在发生故障转移时以编程方式打开. 但是,微服务在启动时抛出以下异常: org.springframework.cloud.stream.binder.BinderException:启动消费者时抛出异常:在 org.springframew ..

KafkaBindingRebalanceListener Bean 不是由 KafkaMessageChannelBinder Bean 自动装配的

文档非常直接,它建议公开 KafkaBindingRebalanceListener 类型的 Bean,并且将在内部调用 onPartitiosnAssigned 方法.我正在尝试做同样的事情,并且在 spring 框架创建其 KafkaMessageChannelBinder Bean 时,ObjectProvider.getIfUnique() 总是返回 null,因为它无法找到所需的 bea ..

Spring Cloud Streams 应用程序抛出嵌套异常是域模型类的 java.lang.ClassCastException

嗨,我正在为 Kafka 试用最新的 Spring 云流框架.但是,对于 String 和 Double 其工作正常,但是当我尝试发送 Java POJO 类时,它会引发以下异常. 我尝试了各种用于序列化和反序列化的配置,但似乎没有任何效果.我能够以 json 格式从供应商处生成消息,但由于错误,消费者无法处理它. 对此问题的任何建议将不胜感激.谢谢 org.springframew ..

使用spring cloud stream kafka读取消息的编程方式

我有一个主题和与之相关的 DLQ.我正在使用@StreamListener 作为主题.我想使用控制器端点按需读取/处理来自 DLQ 的消息. 是否可以使用 Spring Cloud Stream Kafka 来做到这一点. 我们没有在生产中使用执行器.所以不能使用/bindings 端点. 解决方案 您不需要通过 web 启用执行器,但需要将执行器启动器添加到类路径;那么你可以 ..

Kafka Streams:使用Spring Cloud Stream为每组主题定义多个Kafka Streams

我正在尝试使用Kafka Streams做一个简单的POC.但是,启动应用程序时出现异常.我正在使用Spring Boot 2.3.5的Spring-Kafka,Kafka-Streams 2.5.1Kafka流配置 @Configuration公共类KafkaStreamsConfig {私有静态最终Logger日志= LoggerFactory.getLogger(KafkaStreams ..

num.stream.threads创建空闲线程

我有一个带有两个主题的spring boot kafka流应用程序,考虑主题A和B.主题A有16个分区,主题B有1个分区.考虑将应用程序部署在1个实例中,该实例的num.stream.threads = 16. 我运行kafka-consumer-groups.bat命令来检查如何将线程分配给组中的分区,得到以下输出.主题A和B分配了16个线程,其中主题B中的14个线程处于空闲状态. kaf ..

Spring Cloud Stream 3.0存在生产者问题

我阅读了有关Spring cloud stream 3.0的文档,以使用java.util.function.[Supplier/Function/Consumer]理解新的内容来代表生产者,消费和生产,消费者,这应该是正确的./p> 但是我不了解供应商. 文档指出,对供应商的轮询用于始终如一地为供应商生成数据,并且不需要计划的参与. 但是很多时候,我们需要在特定时间生成数据,例如W ..