spring-cloud-stream相关内容
我正在以批处理模式消费消息.我想每 250 毫秒从流中提取 8 条消息. 弹簧:云:流:运动:绑定:输入:消费者:侦听器模式:批处理idleBetweenPolls: 250记录限制:8绑定:输入:组:我的组目的地:流内容类型:应用程序/json 我已将大约 100 条消息推送到流中,然后我启动了消费者. 根据配置,我应该每 250 毫秒提取一次消息.但是轮询器不会每 250 毫秒拉取一
..
我正在尝试将 Spring Integration 应用程序配置为中间件. 这是 application.yml 文件: 弹簧:应用:名称:${vcap.application.name:}云:流:绑定:我的输出:目的地:我的输入默认粘合剂:兔子 我得到的错误是这样的: 嵌套异常是 java.lang.IllegalStateException: Unknown binder conf
..
我正在关注这个模板用于Spring-cloud-stream-kafka 但在使生产者方法 transactional 时卡住了.我之前没有使用过 kafka 所以需要帮助以防 kafka 中需要任何配置更改 如果没有添加事务配置它运行良好,但是当添加事务配置时它在启动时超时 - 2020-11-21 15:07:55.349 ERROR 20432 --- [main] o.s.c.s.b
..
我不确定我的理解是否正确.我们启动了一个 spring 云流应用程序并订阅了一个主题.该应用程序将运行并监视新消息的主题,除非我们发送终止信号以退出.我在想我们是否可以明确退出spring cloud应用程序,比如等待5分钟没有新消息进来?还是处理了 1000 条记录就退出了? 解决方案 您不需要为此停止应用程序,因为它会带来整个 JVM(如果您愿意,您可以随时执行此操作).通常对于这样的
..
我有一个使用 spring-boot-cloud 和 apache-kafka 的项目,感谢 EmbeddedBroker,我有一个涵盖拓扑逻辑的集成测试列表. 我最近发现运行这些测试时日志中有很多噪音. 例如[Producer clientId=producer-2] 无法建立到节点 0 (localhost/127.0.0.1:63267) 的连接.经纪人可能不可用. 经过反
..
我打算为 Kafka 消费者弹性设置多个绑定.更具体地说,备份侦听器与主侦听器具有相同的目标和组,期望代理 IP.备份侦听器的自动启动在启动时关闭,但会在发生故障转移时以编程方式打开. 但是,微服务在启动时抛出以下异常: org.springframework.cloud.stream.binder.BinderException:启动消费者时抛出异常:在 org.springframew
..
我有多个微服务并且前端有 API,喜欢在单独的分区上为每个域事件使用相同的主题,我能够使用 配置 spring kafka binder 以发送到不同的分区 spring.cloud.stream.bindings..producer.partition-key-extractor-name= 实施 PartitionKeyExtractorStrategy 我的问题是我可以将 Kst
..
我使用: org.springframework.cloudspring-cloud-starter-stream-rabbit3.0.1.RELEASE 我需要禁用 Rabbit 来测试应用程序.我试过这个: 弹簧:云:配置:启用:假发现:启用:假 没
..
我正在尝试创建一个聚合器,我可以在其中侦听多条记录并将它们合并为一个.合并后,我通过在 listen() 方法中加入流和聚合应用程序来等待进程事件.流程事件到达时,会触发一些业务逻辑.我在一个 Spring Boot 项目中同时定义了聚合器和进程监听器. @Beanpublic Function, KStream
..
我们有一个使用 Kafka 的 Spring Cloud 流应用程序.要求是在生产者端,消息列表需要放在事务的主题中.同一个应用程序中的消息没有消费者.当我使用 spring.cloud.stream.kafka.binder.transaction.transaction-id 前缀启动事务时,我面临的错误是调度程序没有订阅者,并且从主题获得的分区总数小于交易配置.应用程序无法在事务模式下获取主
..
我正在尝试从 spring-cloud-stream 实现一个基本的处理器.我之前在其他项目上做过这个,所以我认为我很熟悉.但这一次 Spring 在通过 @Autowire 创建我的处理器引用时遇到了一个问题,@Service 组件. 我认为重要的部分是应用程序上的 @EnableBinding(my.class),但我有. 错误是 没有可用的“com.mycompany.conf
..
是否可以记录来自流函数的入站消息?是否有某种拦截器可以让我这样做? 解决方案 使用 KStream.peek(StreamLogger::in) 方法记录传入的字符串 :)
..
我目前一直在研究 Spring Cloud 模式注册表和融合模式注册表.我可以看到一些差异,例如 spring 云模式注册表将模式保存在普通数据库中,默认情况下保存在 h2 中,而融合模式注册表保存在 kafka 主题中. 对于 Spring Cloud 模式注册表,这种方法是否有任何性能影响.据我所知,即使数据在融合的情况下保持在主题上,查询它仍然会延迟.但会产生重大影响吗? 我还看
..
给出示例代码,当我取消注释 spring-cloud-sleuth-stream 依赖项时遇到以下异常: org.springframework.messaging.MessageHandlingException:方法参数类型 [class java.lang.String] 缺少标头“foo"在 org.springframework.messaging.handler.annotation
..
我是 Spring Cloud Stream 的新手.我的用例是从文件源读取并为文件中的每一行发布消息(到 Kafka).我曾尝试使用文件源应用程序启动器(https://github.com/spring-cloud-stream-app-starters/file/tree/master/spring-cloud-starter-stream-source-file)并有能够发布消息.
..
我想在自定义的 ConsumerInterceptor 中注入一个 bean,因为在 Sprint Cloud Stream 3.0.9.RELEASE 中添加了 ConsumerConfigCustomizer.但是,注入的 bean 始终为 NULL. Foo(要注入到 MyConsumerInterceptor 中的依赖项) public class Foo {公共无效foo(字符串
..
我正在尝试使用 RabbitMQ 配置一个简单的 Spring Cloud Stream 应用程序.我使用的代码主要取自 spring-cloud-stream-samples.我有一个入口点: @SpringBootApplication公共类演示应用{公共静态无效主(字符串 [] args){SpringApplication.run(DemoApplication.class, args)
..
在 spring-cloud-stream 中,有没有办法在不重启应用程序的情况下更改应用程序的实例计数和实例索引? 另外,有没有推荐的方法来自动填充这些值?在微服务世界中,这似乎非常困难,因为服务一直在启动和停止. 解决方案 在 spring-cloud-stream 中,有没有办法在不重启应用程序的情况下更改应用程序的实例计数和实例索引? 不在当前版本中,但可以在 Gi
..
我正在尝试找到 Spring Cloud 流的示例,它在其中为 RMQ 创建基于分区的生产者.我想看看它将如何为这些队列创建绑定,因为 RMQ 本身不支持主题的分区,但它会创建与分区数相等的队列数(我读过这个,我可能错了).首先,我想了解如何在基于分区的生产者的 RMQ 上使用 Spring Cloud 流创建生产者. 解决方案 @SpringBootApplication@EnableB
..
我正在尝试实现一个能够自动缩放的 spring boot aws kinesis 消费者,以便与原始实例共享负载(拆分处理分片). 我能够做的:使用定义明确的自述和此处提供的示例Kinesis binder docs 我已经能够启动多个消费者,通过提供这些属性来实际划分分片进行处理. 在生产者上,我通过应用程序属性提供 partitionCount: 2.在消费者身上,我提供了 ins
..