spring-cloud-stream相关内容

Spring Cloud Stream动态频道

我正在使用Spring Cloud Stream,并想以编程方式创建和绑定频道.我的用例是,在应用程序启动期间,我会收到要订阅的Kafka主题的动态列表.然后如何为每个主题创建频道? 解决方案 我最近遇到了类似的情况,下面是我动态创建SubscriberChannel的示例. ConsumerProperties ConsumerProperties =新的ConsumerProper ..
发布时间:2021-04-08 18:50:08 其他开发

Spring Cloud Stream Kafka暂停/恢复活页夹

我们正在使用Spring Cloude流 2.0 &Kafka作为消息代理. 对于目标系统(DB或第三方API)无法使用的情况,我们已实现了一个断路器,该断路器可停止应用程序上下文,如此处所示:绑定可视化和控件 是否有可能通过代码控制活页夹的生命周期,是否意味着在目标服务器关闭的情况下暂停活页夹,以及在启动时恢复 resume ? 解决方案 对不起,我没看懂你的问题. 您可以 ..

Kubernetes上的Spring Cloud Dataflow部署时出现错误

我正在开发Spring Cloud Dataflow Stream应用程序.我能够使用Cloud Foundry中运行的船长来运行Spring云数据流服务器.现在,即使我在部署中的环境配置中明确给出了用户名,我仍试图与在kubernetes集群中运行的船长运行相同的代码,并在部署时遇到错误. 原因:io.fabric8.kubernetes.client.KubernetesClientEx ..

杰克逊无法反序列化(Spring Cloud Stream Kafka)

我正在尝试从kafka中读取json消息,但出现了一个异常,该异常表示Jackson无法将json反序列化为POJO. json就像{"code":"500","count":22,"from":1528343820000,"to":1528343880000},它是kafka流的输出. POJO声明json的所有属性,并且与生成JSON消息的POJO完全相同.所以我不知道为什么会这样 ..
发布时间:2021-02-09 20:13:56 其他开发

Spring Cloud Dataflow有什么好处?

根据我所看到的,在Spring Cloud Dataflow(SCDF)中创建流将部署基础应用程序,绑定通信服务(如RabbitMQ),设置Spring Cloud Stream环境变量并启动应用程序。 同时,我一直在使用Spring Cloud Dataflow遇到一些缺点: SCDF服务器是PCF上的内存猪(我的流只有6个应用程序,但服务器大约需要10GB) 在应用程序命名,内存 ..

在获取数据Spring-Cloud-Stream Kafka Azure事件中心时出现意外错误代码13-

自12月18日以来,此问题突然弹出,一切正常,找不到任何线索,这个异常情况在经过6个月的正常工作后突然不断出现. 我无法在本地独立环境中进行复制,但是它出现在具有微服务的docker映像中,其中之一与 Azure even hub 进行通信. > 我已经搜索并找到了发布.在这篇文章中,我也遇到了同样的问题,但是找不到任何解决方法或线索,所以我发布了这个问题. 请参考下面的堆栈跟踪: ..

使用Spring Cloud Stream处理一次是否可以获得准确的处理?

当前,我正在使用具有几乎默认配置的SCS在微服务之间发送和接收消息. 不知何故我已经读过了 https://www.confluent.io/blog/enabling-exactly-kafka-流 并想知道我们是否只是通过Spring启动应用程序中的属性在其中放置值为"exactly-once"的"processing.guarantee"属性,是否可行? 解决方案 在您的问题 ..

如何在spring-cloud-stream中的kafka进程拓扑中使用交互式查询?

是否可以在Spring Cloud Stream中使用具有@EnableBinding批注的类或在具有@StreamListener的方法中使用交互式查询(InteractiveQueryService)?我尝试实例化提供的我的@StreamListener方法正在监听一堆KTables和KStreams,并且在过程拓扑(例如过滤)期间,我必须检查KStream的键是否已存在于特定的KTable中 ..

根据部分数据属性更新KTable

我正在尝试使用对象的部分数据更新KTable. 例如.用户对象是 {"id":1, "name":"Joe", "age":28} 该对象将流式传输到主题中,并按关键字分组到KTable中. 现在,用户对象按{"id":1, "age":33}部分更新,并流式传输到表中.但是更新的表如下所示{"id":1, "name":null, "age":28}. 预期的输出是{"id":1, "name" ..

num.stream.threads创建空闲线程

我有一个带有两个主题的spring boot kafka流应用程序,考虑主题A和B.主题A有16个分区,主题B有1个分区.考虑将应用程序部署在1个实例中,该实例的num.stream.threads = 16. 我运行kafka-consumer-groups.bat命令来检查如何将线程分配给组中的分区,得到以下输出.主题A和B分配了16个线程,其中主题B中的14个线程处于空闲状态. kaf ..

Spring Cloud Stream 3.0存在生产者问题

我阅读了有关Spring cloud stream 3.0的文档,以使用java.util.function.[Supplier/Function/Consumer]理解新的内容来代表生产者,消费和生产,消费者,这应该是正确的./p> 但是我不了解供应商. 文档指出,对供应商的轮询用于始终如一地为供应商生成数据,并且不需要计划的参与. 但是很多时候,我们需要在特定时间生成数据,例如W ..

Spring Cloud功能-针对不同使用者的单独路由表达

我有一个服务,该服务从不同的消息队列接收不同的结构化消息.有了@StreamListener conditions,我们可以在每种消息类型中选择如何处理该消息.例如: 我们收到两种不同类型的消息,它们具有不同的标头字段和值,例如 从“订单"队列中收到: Order1: { Header: {catalog:groceries} } Order2: { Header: {catal ..
发布时间:2020-07-25 19:22:25 其他开发

Spring Cloud Stream多种功能定义

是否可以使用具有多个单独功能/绑定的功能(spring.cloud.function)风格的反应式SCS应用程序?我发现的所有示例始终仅使用默认绑定input, output注册一个功能Bean.我想注册多个,每个都有自己的绑定. 传统上,可以使用spring-cloud-stream-reactive完成此操作,但现在不建议使用它,而提供功能支持. 解决方案 是的,有可能出现在最新 ..
发布时间:2020-07-25 19:22:22 其他开发

当目标系统关闭时,阻止Spring Cloud Stream @StreamListener监听

我有一个应用程序,可以从Kafka获取消息并调用目标系统来更新旧版Oracle DB. 我想启用一种方案,如果目标系统关闭,则将消息保留在Kafka总线上,并且在给定的时间内不对其进行处理.我当时在想一些基于断路器Hystrix的解决方案,但是我找不到任何机制可以告诉Spring Cloud Stream“停止"事件监听.我能想到的唯一其他选择是,如果断路器是断开的,则将这些消息传送到错误/ ..