一个 SCDF 源,2 个处理器,但每个项目只有 1 个进程 [英] one SCDF source, 2 processors but only 1 processes each item
问题描述
我的用例是对此的一个变体:
processorA 在上面,processorB 在下面
当我部署并运行它时,源发出 A、B 和 C,然后 处理器 1 和处理器 2 接收 A、B 和 C
我很困惑,如果我想要的行为是我可以在 Spring Cloud Data Flow 中发生的事情,或者是否有 RabbitMQ 设置(如消息删除的答案所暗示的那样)
就是当你设置自动确认标志时发生的事情.这样,消息一被消费就会被确认——所以从队列中消失了."
如果是这样的话,我可以在我的 Spring Cloud Data Flow 源中设置它还是它是一个 RabbitMQ 设置还是完全其他的东西
更新:
我已添加
spring.cloud.stream.bindings.input.group=consumerGroup
到我的处理器的 application.properties 文件.
不幸的是,两个处理器都接收到完全相同的数据.
我是否需要向我的源的 application.properties 添加类似的条目?
我需要更改处理器上的注释吗?目前,它是:
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
我是否需要以任何方式修改源代码上的注释?目前,它是:
@Bean@InboundChannelAdapter(value = Source.OUTPUT, poller =@Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
包含@Poller 是否会以任何方式改变这一点?
更新:
属性是否名为 spring.cloud.stream.instanceCount?
对于流应用,您需要设置 ...consumer.group 属性,以便它们都在同一组中并竞争消息.
但这应该在 SCDF 中自动发生.
My use case is a variation on this:
Create Stream with one source, two parallel processors and one sink in Spring Cloud Data Flow
In the example, 1 source emits an item to rabbitmq and both processors get it.
I want the opposite. I want the source to emit items to rabbitmq but only 1 processor handles each item.
Lets pretend I have:
1 source named source 2 processors named processor1 and processor2
So source emits: A, B, C to rabbitmq
RabbitMQ will emit A
Whichever processor gets A first will process it - lets say processor1 is the lucky one and handles A.
Then RabbitMQ will emit B
Since processor1 is busy with A and processor2 is idle processor2 handles B
RabbitMQ will emits C
processor1 finished with A and is idle so processor1 handles C
The Spring Cloud Data Flow graph I came up with is:
processorA is the one on top, processorB is the lower one
When I deploy this and run it, source emits A, B and C then both processor1 and processor2 receive A, B and then C
I'm confused if the behavior I want is something I can make happen in Spring Cloud Data Flow OR if there is a RabbitMQ setting for this as implied by the answer that says message removal
"is what is happening when you set the auto-acknowledge flag. In that way, the message is acknowledged as soon as it's consumed - so gone from the queue."
If that's the case, can I set it in my Spring Cloud Data Flow source OR is it a RabbitMQ setting or is it something else entirely
UPDATE:
I have added
spring.cloud.stream.bindings.input.group=consumerGroup
to the application.properties file of my processor.
Unfortunately, both processors are receiving the exact same data.
Do I need to add a similar entry to the application.properties of my source?
Do I need to change the annotation on the processor? Currently, it is:
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
Do I need to modify the annotation on the source in any fashion? Currently, it is:
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller =
@Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
Does the inclusion of @Poller change this in any fashion?
UPDATE:
Is the property named spring.cloud.stream.instanceCount?
For stream apps, you need to set the ...consumer.group property so they are both in the same group and compete for messages.
But that should happen automatically with SCDF.
这篇关于一个 SCDF 源,2 个处理器,但每个项目只有 1 个进程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!