spring-cloud-stream相关内容

IdleBetween 池未按指定提取消息

我正在以批处理模式消费消息.我想每 250 毫秒从流中提取 8 条消息. 弹簧:云:流:运动:绑定:输入:消费者:侦听器模式:批处理idleBetweenPolls: 250记录限制:8绑定:输入:组:我的组目的地:流内容类型:应用程序/json 我已将大约 100 条消息推送到流中,然后我启动了消费者. 根据配置,我应该每 250 毫秒提取一次消息.但是轮询器不会每 250 毫秒拉取一 ..

Spring 集成:Binder 配置(适用于 Rabbit)

我正在尝试将 Spring Integration 应用程序配置为中间件. 这是 application.yml 文件: 弹簧:应用:名称:${vcap.application.name:}云:流:绑定:我的输出:目的地:我的输入默认粘合剂:兔子 我得到的错误是这样的: 嵌套异常是 java.lang.IllegalStateException: Unknown binder conf ..
发布时间:2021-08-20 20:11:30 其他开发

Spring Cloud Stream kafka 事务配置

我正在关注这个模板用于Spring-cloud-stream-kafka 但在使生产者方法 transactional 时卡住了.我之前没有使用过 kafka 所以需要帮助以防 kafka 中需要任何配置更改 如果没有添加事务配置它运行良好,但是当添加事务配置时它在启动时超时 - 2020-11-21 15:07:55.349 ERROR 20432 --- [main] o.s.c.s.b ..

spring cloud stream - spring cloud 流应用程序可以明确退出吗?

我不确定我的理解是否正确.我们启动了一个 spring 云流应用程序并订阅了一个主题.该应用程序将运行并监视新消息的主题,除非我们发送终止信号以退出.我在想我们是否可以明确退出spring cloud应用程序,比如等待5分钟没有新消息进来?还是处理了 1000 条记录就退出了? 解决方案 您不需要为此停止应用程序,因为它会带来整个 JVM(如果您愿意,您可以随时执行此操作).通常对于这样的 ..
发布时间:2021-08-20 20:11:21 其他开发

从 spring-cloud-stream 访问 kafka 生产者工厂

我有一个使用 spring-boot-cloud 和 apache-kafka 的项目,感谢 EmbeddedBroker,我有一个涵盖拓扑逻辑的集成测试列表. 我最近发现运行这些测试时日志中有很多噪音. 例如[Producer clientId=producer-2] 无法建立到节点 0 (localhost/127.0.0.1:63267) 的连接.经纪人可能不可用. 经过反 ..
发布时间:2021-08-20 20:11:18 其他开发

使用 Spring Cloud Stream 无法在消费者端设置到同一目的地和组的多个绑定

我打算为 Kafka 消费者弹性设置多个绑定.更具体地说,备份侦听器与主侦听器具有相同的目标和组,期望代理 IP.备份侦听器的自动启动在启动时关闭,但会在发生故障转移时以编程方式打开. 但是,微服务在启动时抛出以下异常: org.springframework.cloud.stream.binder.BinderException:启动消费者时抛出异常:在 org.springframew ..

Spring Cloud Stream 主题分区 KStream 读/写

我有多个微服务并且前端有 API,喜欢在单独的分区上为每个域事件使用相同的主题,我能够使用 配置 spring kafka binder 以发送到不同的分区 spring.cloud.stream.bindings..producer.partition-key-extractor-name= 实施 PartitionKeyExtractorStrategy 我的问题是我可以将 Kst ..
发布时间:2021-08-20 20:11:12 其他开发

创建一个 Kafka 聚合器并加入一个事件

我正在尝试创建一个聚合器,我可以在其中侦听多条记录并将它们合并为一个.合并后,我通过在 listen() 方法中加入流和聚合应用程序来等待进程事件.流程事件到达时,会触发一些业务逻辑.我在一个 Spring Boot 项目中同时定义了聚合器和进程监听器. @Beanpublic Function, KStream ..
发布时间:2021-08-20 20:11:06 其他开发

Spring Cloud 在生产者端流式传输 kafka 事务

我们有一个使用 Kafka 的 Spring Cloud 流应用程序.要求是在生产者端,消息列表需要放在事务的主题中.同一个应用程序中的消息没有消费者.当我使用 spring.cloud.stream.kafka.binder.transaction.transaction-id 前缀启动事务时,我面临的错误是调度程序没有订阅者,并且从主题获得的分区总数小于交易配置.应用程序无法在事务模式下获取主 ..
发布时间:2021-08-20 20:11:03 其他开发

为什么 Spring 不能自动装配我的云流处理器?

我正在尝试从 spring-cloud-stream 实现一个基本的处理器.我之前在其他项目上做过这个,所以我认为我很熟悉.但这一次 Spring 在通过 @Autowire 创建我的处理器引用时遇到了一个问题,@Service 组件. 我认为重要的部分是应用程序上的 @EnableBinding(my.class),但我有. 错误是 没有可用的“com.mycompany.conf ..
发布时间:2021-08-20 20:11:00 其他开发

Confluent 模式注册表与 Spring 云模式注册表

我目前一直在研究 Spring Cloud 模式注册表和融合模式注册表.我可以看到一些差异,例如 spring 云模式注册表将模式保存在普通数据库中,默认情况下保存在 h2 中,而融合模式注册表保存在 kafka 主题中. 对于 Spring Cloud 模式注册表,这种方法是否有任何性能影响.据我所知,即使数据在融合的情况下保持在主题上,查询它仍然会延迟.但会产生重大影响吗? 我还看 ..

从 spring-cloud-stream app starter 修改消息体

我是 Spring Cloud Stream 的新手.我的用例是从文件源读取并为文件中的每一行发布消息(到 Kafka).我曾尝试使用文件源应用程序启动器(https://github.com/spring-cloud-stream-app-starters/file/tree/master/spring-cloud-starter-stream-source-file)并有能够发布消息. ..
发布时间:2021-08-20 20:10:44 其他开发

Spring Cloud Stream 不创建队列

我正在尝试使用 RabbitMQ 配置一个简单的 Spring Cloud Stream 应用程序.我使用的代码主要取自 spring-cloud-stream-samples.我有一个入口点: @SpringBootApplication公共类演示应用{公共静态无效主(字符串 [] args){SpringApplication.run(DemoApplication.class, args) ..
发布时间:2021-08-20 20:10:32 其他开发

在运行时更改 spring-cloud-stream 实例索引/计数

在 spring-cloud-stream 中,有没有办法在不重启应用程序的情况下更改应用程序的实例计数和实例索引? 另外,有没有推荐的方法来自动填充这些值?在微服务世界中,这似乎非常困难,因为服务一直在启动和停止. 解决方案 在 spring-cloud-stream 中,有没有办法在不重启应用程序的情况下更改应用程序的实例计数和实例索引? 不在当前版本中,但可以在 Gi ..
发布时间:2021-08-20 20:10:29 其他开发

如何使用 RMQ 和 Spring Cloud 流创建基于分区的生产者?

我正在尝试找到 Spring Cloud 流的示例,它在其中为 RMQ 创建基于分区的生产者.我想看看它将如何为这些队列创建绑定,因为 RMQ 本身不支持主题的分区,但它会创建与分区数相等的队列数(我读过这个,我可能错了).首先,我想了解如何在基于分区的生产者的 RMQ 上使用 Spring Cloud 流创建生产者. 解决方案 @SpringBootApplication@EnableB ..
发布时间:2021-08-20 20:10:24 Java开发

spring 云流 kinesis Binder

我正在尝试实现一个能够自动缩放的 spring boot aws kinesis 消费者,以便与原始实例共享负载(拆分处理分片). 我能够做的:使用定义明确的自述和此处提供的示例Kinesis binder docs 我已经能够启动多个消费者,通过提供这些属性来实际划分分片进行处理. 在生产者上,我通过应用程序属性提供 partitionCount: 2.在消费者身上,我提供了 ins ..