spring-cloud-stream相关内容
我遇到一个问题,源发送了一个GenericMessage[有效负载=xxxxx,...]而接收器接收的消息为10,120,120,120,120,120。 此问题发生在我设置Avro消息转换器之后。如果我删除Avro消息转换器并使用StreamListener处理消息转换,它将工作得很好。 源应用程序.属性 spring.cloud.stream.bindings.toGreet
..
我们正在有条件地触发Kafka消费者。如何知道该主题的所有消息是否都已被消费。 推荐答案 您可以使用ListenerContainerCustomizer@Bean在监听器容器上配置idleEventInterval,并使用@EventListener方法消费容器空闲事件。 参见https://docs.spring.io/spring-kafka/docs/current/re
..
这是question的续集。我可以把“普通”的阿帕奇卡夫卡活页夹和功能模型一起使用吗?到目前为止,使用基于注释的配置,我在一个应用程序中混合了spring-cloud-stream-binder-kafka和spring-cloud-stream-binder-kafka-streams两者,spring-cloud-stream-binder-kafka用于简单的使用/生产和spring-clo
..
我有StreamListener,我想使用新的功能模型和Consumer<;>;替换它。遗憾的是,我不知道如何将@Transaction转换到新模式: @Transactional @StreamListener(PaymentChannels.PENDING_PAYMENTS_INPUT) public void executePayments(PendingPaymentEven
..
我正在使用PolledProcessor实现一个Spring云数据流处理器。我遵循了这里的示例https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers。以下是我的代码。我将一个带有源管道的流部署到SCDF的这个处理器(源|轮询处理器),并让源发布了一些消息。我确认处理器每秒轮询来自SCDF Rabbitmq
..
Spring Cloud Stream的文件-供应商依赖包括FileSupplierConfiguration,它配置一个文件消息源(来自Spring集成)。我需要定制这个@配置提供的FileReadingMessageSource,但我不确定最好的方法。它仅提供了控制其@Bean%s的基本属性,但我需要更多。 我考虑将FileSupplierConfiguration从应用程序中排除,但之
..
我有一个涉及用不同语言(Java、Python、C#等)编写的多个应用程序的用例,我想在这些应用程序之间创建几个流。下面给出了一个说明性的例子。 我正在尝试通过一个编排层找到完成此任务的最佳方法,该编排层完成所有幕后工作,包括记账、部署、消息队列创建和绑定等。 为此,我正在研究Spring Cloud数据流(SCDF)。然而,SCDF被设计为专门为Spring引导应用程序创建数据集成管
..
我知道如何使用命令式编程方法定义生产者,但我找不到如何使用函数式编程方法定义生产者。 我读了关于这个的Spring Cloud Stream Binder文档,但只找到了如何定义消费者,或者消费者和生产者(例如,从主题中获取信息,转换数据并发送到另一个主题)。 所以,我不知道是否可以继续使用像@Input、@Ouptut这样的批注来定义单个处理器,在这一点上我非常困惑,因为库表明这些批
..
我正在尝试将StreamsUncaughtExceptionHandler添加到我的Kafka流处理器中。该处理器是用Kafka函数编写的。我查看了suggestion provided by Artem Bilan以将StreamsUncaughtExceptionHandler包括到我的服务中,但我的异常从未被它捕获/处理。 配置Bean: @Autowired UnCaughtE
..
我正在尝试反序列化从Conumers作用域之外的服务生成的消息。 如果序列化/反序列化模型包不同,则Kafka模型不在受信任的包中。会出现错误 所以我想我有两个选择。或者以json/字符串的形式生成消息,或者让消费者包含生产者库并使用相同的模型来反序列化消息。 我想在消费者中反序列化时手动忽略消息的标头类型,但我未能做到这一点。有没有办法做到这一点,或者我是否坚持使用上述两个选项?
..
我想看看能否通过Docker容器中的docker-compose连接Spring Cloud Stream Kafka,但是我被卡住了,还没有找到解决方案,请帮帮我。 我正在从Spring Microservices In Action开始工作;我现在找不到任何帮助。 Docker-与卡夫卡和ZooKeeper作曲: version: '2' services: zookee
..
我正在为我的Spring Kafka Streams应用程序的定制而苦苦挣扎。 我一直在尝试在我的KStreams中配置未捕获(运行时异常)处理。 参照文档https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html
..
我使用的是Spring Cloud Stream和Kafka Stream。假设我有一个处理器,它是一个将字符串的KStream转换为CityProgrammes的KStream的函数。它调用一个API来按名称查找City,并调用另一个转换来查找该城市附近的任何事件。 现在的问题是转换过程中发生任何错误,整个应用程序都会停止。我想把这一条特别的信息发送给DLQ,然后继续前进。我已经读了几天了
..
我将Spring Cloud Streams与Kafka Streams绑定器一起使用,功能 样式处理器API和多处理器。 以这种方式配置一个具有多个处理器和多个Kafka主题的处理应用程序,并使用/Actuator、WebClient等方式保持在Spring Boot领域,这真的很酷。事实上,我更喜欢它,而不是使用纯阿帕奇Kafka Streams。 但是:我希望为处理器内发生的异常
..
我正在使用批处理模式从 kinesis 流中提取记录.我们正在使用 spring aws kinesis binder. 大多数时候我们无法从流中提取消息.只有某些时候我们能够从流中提取消息. 我的配置如下所示 我的配置 弹簧:云:溪流:运动:粘合剂:锁:租期:30读取容量:1写入容量:1检查点:读取容量:1写入容量:1绑定:InStreamGroupOne:消费者:liste
..
测试类:- @RunWith(SpringRunner.class)@SpringBootTest(classes = { WebsocketSourceConfiguration.class,WebSocketSourceIntegrationTests.class }, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, p
..
我正在使用批处理模式从 kinesis 流中提取记录.我们正在使用 spring aws kinesis binder. 大多数时候我们无法从流中提取消息.只有有时我们能够从流中提取消息. 我的配置如下 我的配置 弹簧:云:溪流:运动:粘合剂:锁:租期:30读取容量:1写入容量:1检查点:读取容量:1写入容量:1绑定:内流组一:消费者:侦听器模式:批处理idleBetweenP
..
我使用 Kafka Streams Binder 的 Spring Cloud Stream 应用程序具有以下属性: spring.cloud.stream.bindings:windowStream-in-0:目的地:输入windowStream-out-0:目的地:window1提示1Stream-in-0:目的地:window1提示1Stream-out-0:目的地:提示0 中的现实流:目
..
我的流处理器中有一个包含多个分区的主题,我只想从一个分区流式传输该主题,并且无法弄清楚如何配置此 spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=s-processorspring.cloud.stream.bindings.input.destination=uinputspring.cloud.
..
目前我正在使用 SCS 和几乎默认配置的微服务之间发送和接收消息.不知怎的,我读过这个 https://www.confluent.io/blog/enabling-exactly-kafka-流 并想知道如果我们只是通过 Spring Boot 应用程序中的属性将名为“processing.guarantee"且值为“exactly-once"的属性放在那里,它会起作用吗? 解决方
..