spring-cloud-stream-binder-kafka相关内容
这是question的续集。我可以把“普通”的阿帕奇卡夫卡活页夹和功能模型一起使用吗?到目前为止,使用基于注释的配置,我在一个应用程序中混合了spring-cloud-stream-binder-kafka和spring-cloud-stream-binder-kafka-streams两者,spring-cloud-stream-binder-kafka用于简单的使用/生产和spring-clo
..
我将Spring Cloud Streams与Kafka Streams绑定器一起使用,功能 样式处理器API和多处理器。 以这种方式配置一个具有多个处理器和多个Kafka主题的处理应用程序,并使用/Actuator、WebClient等方式保持在Spring Boot领域,这真的很酷。事实上,我更喜欢它,而不是使用纯阿帕奇Kafka Streams。 但是:我希望为处理器内发生的异常
..
我正在使用 Spring Cloud Stream 处理 Kafka Streams.在消息处理应用程序中,有可能会产生错误.所以消息不应该被提交并再次重试. 我的申请方法- @Beanpublic Function, KStream>过程() {返回(输入)->{KStreamkt = input.flatMap
..
我们有多个应用程序消费者监听同一个 kafka 主题,生产者在向主题发送消息时设置消息头,以便特定实例可以评估头并处理消息.例如 @StreamListener(target=ITestSink.CHANNEL_NAME,condition=“headers['franchiseName'] == 'sydney'")public void fullfillOrder(@Payload Test
..
我有一个带有 2 个主题的 Spring Boot kafka 流应用程序,考虑主题 A 和 B.主题 A 有 16 个分区,主题 B 有 1 个分区.考虑将应用程序部署在具有 num.stream.threads=16.我运行 kafka-consumer-groups.bat 命令来检查线程如何分配给组中的分区,得到以下输出.主题 A 和 B 分配了 16 个线程,其中主题 B 中的 14 个
..
我正在尝试使用 Kafka Streams 做一个简单的 POC.但是,我在启动应用程序时遇到异常.我正在使用 Spring-Kafka、Kafka-Streams 2.5.1 和 Spring Boot 2.3.5Kafka流配置 @Configuration公共类 KafkaStreamsConfig {私有静态最终记录器日志 = LoggerFactory.getLogger(KafkaS
..
有没有办法让(Spring Cloud Stream)应用程序在 Confluent Cloud 中自动创建他们需要的主题? 到目前为止,我不得不手动创建它们,当您考虑还必须设置变更日志主题时,这很容易出错. 解决方案 binder 属性 autoCreateTopics 是 默认为true.因此应该自动创建主题(除非代理权限不允许 - 但我希望在这种情况下会在日志中看到错误).
..
嗨,我正在为 Kafka 试用最新的 Spring 云流框架.但是,对于 String 和 Double 其工作正常,但是当我尝试发送 Java POJO 类时,它会引发以下异常. 我尝试了各种用于序列化和反序列化的配置,但似乎没有任何效果.我能够以 json 格式从供应商处生成消息,但由于错误,消费者无法处理它. 对此问题的任何建议将不胜感激.谢谢 org.springframew
..
我正在关注这个模板用于Spring-cloud-stream-kafka 但在使生产者方法 transactional 时卡住了.我之前没有使用过 kafka 所以需要帮助以防 kafka 中需要任何配置更改 如果没有添加事务配置它运行良好,但是当添加事务配置时它在启动时超时 - 2020-11-21 15:07:55.349 ERROR 20432 --- [main] o.s.c.s.b
..
我打算为 Kafka 消费者弹性设置多个绑定.更具体地说,备份侦听器与主侦听器具有相同的目标和组,期望代理 IP.备份侦听器的自动启动在启动时关闭,但会在发生故障转移时以编程方式打开. 但是,微服务在启动时抛出以下异常: org.springframework.cloud.stream.binder.BinderException:启动消费者时抛出异常:在 org.springframew
..
我想在自定义的 ConsumerInterceptor 中注入一个 bean,因为在 Sprint Cloud Stream 3.0.9.RELEASE 中添加了 ConsumerConfigCustomizer.但是,注入的 bean 始终为 NULL. Foo(要注入到 MyConsumerInterceptor 中的依赖项) public class Foo {公共无效foo(字符串
..
文档非常直接,它建议公开 KafkaBindingRebalanceListener 类型的 Bean,并且将在内部调用 onPartitiosnAssigned 方法.我正在尝试做同样的事情,并且在 spring 框架创建其 KafkaMessageChannelBinder Bean 时,ObjectProvider.getIfUnique() 总是返回 null,因为它无法找到所需的 bea
..
嗨,我正在为 Kafka 试用最新的 Spring 云流框架.但是,对于 String 和 Double 其工作正常,但是当我尝试发送 Java POJO 类时,它会引发以下异常. 我尝试了各种用于序列化和反序列化的配置,但似乎没有任何效果.我能够以 json 格式从供应商处生成消息,但由于错误,消费者无法处理它. 对此问题的任何建议将不胜感激.谢谢 org.springframew
..
我有一个主题和与之相关的 DLQ.我正在使用@StreamListener 作为主题.我想使用控制器端点按需读取/处理来自 DLQ 的消息. 是否可以使用 Spring Cloud Stream Kafka 来做到这一点. 我们没有在生产中使用执行器.所以不能使用/bindings 端点. 解决方案 您不需要通过 web 启用执行器,但需要将执行器启动器添加到类路径;那么你可以
..
我正在尝试使用Kafka Streams做一个简单的POC.但是,启动应用程序时出现异常.我正在使用Spring Boot 2.3.5的Spring-Kafka,Kafka-Streams 2.5.1Kafka流配置 @Configuration公共类KafkaStreamsConfig {私有静态最终Logger日志= LoggerFactory.getLogger(KafkaStreams
..
我正在使用Spring Cloud Stream开发Kafka Streams.在消息处理应用程序中,可能会产生错误.因此,不应再次提交并重试该消息. 我的申请方法- @Bean public Function, KStream> process() { return (input) -> { KStream
..
我有一个带有两个主题的spring boot kafka流应用程序,考虑主题A和B.主题A有16个分区,主题B有1个分区.考虑将应用程序部署在1个实例中,该实例的num.stream.threads = 16. 我运行kafka-consumer-groups.bat命令来检查如何将线程分配给组中的分区,得到以下输出.主题A和B分配了16个线程,其中主题B中的14个线程处于空闲状态. kaf
..
我正在研究Spring Cloud Stream Apache Kafka示例.我正在开发引用以下代码的代码: https://www.youtube.com/watch?v= YPDzcmqwCNo . org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'pvo
..
我阅读了有关Spring cloud stream 3.0的文档,以使用java.util.function.[Supplier/Function/Consumer]理解新的内容来代表生产者,消费和生产,消费者,这应该是正确的./p> 但是我不了解供应商. 文档指出,对供应商的轮询用于始终如一地为供应商生成数据,并且不需要计划的参与. 但是很多时候,我们需要在特定时间生成数据,例如W
..